1
0
mirror of https://github.com/ACINQ/eclair.git synced 2024-11-19 09:54:02 +01:00

Move channel collector inside Peer actor (#2688)

The `Peer` actor can now directly be queried for the list of its channels.
This makes this feature more reusable than the previous actor that was
customized for the peer-ready scenario.
This commit is contained in:
Bastien Teinturier 2023-06-13 14:40:11 +02:00 committed by GitHub
parent 05ef2f9552
commit faebbfae15
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 285 additions and 119 deletions

View File

@ -293,7 +293,7 @@ class EclairImpl(appKit: Kit) extends Eclair with Logging {
override def closedChannels(nodeId_opt: Option[PublicKey], paginated_opt: Option[Paginated])(implicit timeout: Timeout): Future[Iterable[RES_GET_CHANNEL_INFO]] = {
Future {
appKit.nodeParams.db.channels.listClosedChannels(nodeId_opt, paginated_opt).map { data =>
RES_GET_CHANNEL_INFO(nodeId = data.remoteNodeId, channelId = data.channelId, state = CLOSED, data = data)
RES_GET_CHANNEL_INFO(nodeId = data.remoteNodeId, channelId = data.channelId, channel = ActorRef.noSender, state = CLOSED, data = data)
}
}
}

View File

@ -265,7 +265,7 @@ final case class RES_BUMP_FUNDING_FEE(rbfIndex: Int, fundingTxId: ByteVector32,
final case class RES_SPLICE(fundingTxIndex: Long, fundingTxId: ByteVector32, capacity: Satoshi, balance: MilliSatoshi) extends CommandSuccess[CMD_SPLICE]
final case class RES_GET_CHANNEL_STATE(state: ChannelState) extends CommandSuccess[CMD_GET_CHANNEL_STATE]
final case class RES_GET_CHANNEL_DATA[+D <: ChannelData](data: D) extends CommandSuccess[CMD_GET_CHANNEL_DATA]
final case class RES_GET_CHANNEL_INFO(nodeId: PublicKey, channelId: ByteVector32, state: ChannelState, data: ChannelData) extends CommandSuccess[CMD_GET_CHANNEL_INFO]
final case class RES_GET_CHANNEL_INFO(nodeId: PublicKey, channelId: ByteVector32, channel: ActorRef, state: ChannelState, data: ChannelData) extends CommandSuccess[CMD_GET_CHANNEL_INFO]
/*
8888888b. d8888 88888888888 d8888

View File

@ -1996,7 +1996,7 @@ class Channel(val nodeParams: NodeParams, val wallet: OnChainChannelFunder with
case Event(c: CMD_GET_CHANNEL_INFO, _) =>
val replyTo = if (c.replyTo == ActorRef.noSender) sender() else c.replyTo
replyTo ! RES_GET_CHANNEL_INFO(remoteNodeId, stateData.channelId, stateName, stateData)
replyTo ! RES_GET_CHANNEL_INFO(remoteNodeId, stateData.channelId, self, stateName, stateData)
stay()
case Event(c: CMD_ADD_HTLC, d: PersistentChannelData) =>

View File

@ -42,6 +42,8 @@ import fr.acinq.eclair.remote.EclairInternalsSerializer.RemoteTypes
import fr.acinq.eclair.wire.protocol
import fr.acinq.eclair.wire.protocol.{Error, HasChannelId, HasTemporaryChannelId, LightningMessage, NodeAddress, OnionMessage, RoutingMessage, UnknownMessage, Warning}
import scala.concurrent.duration.DurationInt
/**
* This actor represents a logical peer. There is one [[Peer]] per unique remote node id at all time.
*
@ -326,6 +328,15 @@ class Peer(val nodeParams: NodeParams, remoteNodeId: PublicKey, wallet: OnchainP
}, d.channels.values.toSet)
stay()
case Event(r: GetPeerChannels, d) =>
if (d.channels.isEmpty) {
r.replyTo ! PeerChannels(remoteNodeId, Nil)
} else {
val actor = context.spawnAnonymous(PeerChannelsCollector(remoteNodeId))
actor ! PeerChannelsCollector.GetChannels(r.replyTo, d.channels.values.map(_.toTyped).toSet)
}
stay()
case Event(_: Peer.OutgoingMessage, _) => stay() // we got disconnected or reconnected and this message was for the previous connection
case Event(RelayOnionMessage(messageId, _, replyTo_opt), _) =>
@ -528,14 +539,16 @@ object Peer {
case class SpawnChannelNonInitiator(open: Either[protocol.OpenChannel, protocol.OpenDualFundedChannel], channelConfig: ChannelConfig, channelType: SupportedChannelType, localParams: LocalParams, peerConnection: ActorRef)
case class GetPeerInfo(replyTo: Option[typed.ActorRef[PeerInfoResponse]])
sealed trait PeerInfoResponse {
def nodeId: PublicKey
}
sealed trait PeerInfoResponse { def nodeId: PublicKey }
case class PeerInfo(peer: ActorRef, nodeId: PublicKey, state: State, address: Option[NodeAddress], channels: Set[ActorRef]) extends PeerInfoResponse
case class PeerNotFound(nodeId: PublicKey) extends PeerInfoResponse with DisconnectResponse { override def toString: String = s"peer $nodeId not found" }
/** Return the peer's current channels: note that the data may change concurrently, never assume it is fully up-to-date. */
case class GetPeerChannels(replyTo: typed.ActorRef[PeerChannels])
case class ChannelInfo(state: ChannelState, data: ChannelData)
case class PeerChannels(nodeId: PublicKey, channels: Seq[ChannelInfo])
case class PeerRoutingMessage(peerConnection: ActorRef, remoteNodeId: PublicKey, message: RoutingMessage) extends RemoteTypes
/**

View File

@ -0,0 +1,89 @@
/*
* Copyright 2023 ACINQ SAS
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package fr.acinq.eclair.io
import akka.actor.typed.scaladsl.adapter.{ClassicActorRefOps, TypedActorRefOps}
import akka.actor.typed.scaladsl.{ActorContext, Behaviors}
import akka.actor.typed.{ActorRef, Behavior}
import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey
import fr.acinq.eclair.Logs
import fr.acinq.eclair.channel.{CMD_GET_CHANNEL_INFO, ChannelData, ChannelState, RES_GET_CHANNEL_INFO}
/**
* Collect the current states of a peer's channels.
* If one of the channel actors dies (e.g. because it has been closed), it will be ignored: callers may thus receive
* fewer responses than expected.
* Since channels are constantly being updated concurrently, the channel data is just a recent snapshot: callers should
* never expect this data to be fully up-to-date.
*/
object PeerChannelsCollector {
// @formatter:off
sealed trait Command
case class GetChannels(replyTo: ActorRef[Peer.PeerChannels], channels: Set[ActorRef[CMD_GET_CHANNEL_INFO]]) extends Command
private case class WrappedChannelInfo(channel: ActorRef[CMD_GET_CHANNEL_INFO], state: ChannelState, data: ChannelData) extends Command
private case class IgnoreRequest(channel: ActorRef[CMD_GET_CHANNEL_INFO]) extends Command
// @formatter:on
def apply(remoteNodeId: PublicKey): Behavior[Command] = {
Behaviors.setup { context =>
Behaviors.withMdc(Logs.mdc(remoteNodeId_opt = Some(remoteNodeId))) {
Behaviors.receiveMessagePartial {
case GetChannels(replyTo, channels) =>
val adapter = context.messageAdapter[RES_GET_CHANNEL_INFO](r => WrappedChannelInfo(r.channel.toTyped, r.state, r.data))
channels.foreach { c =>
context.watchWith(c, IgnoreRequest(c))
c ! CMD_GET_CHANNEL_INFO(adapter.toClassic)
}
new PeerChannelsCollector(replyTo, remoteNodeId, context).collect(channels, Nil)
}
}
}
}
}
private class PeerChannelsCollector(replyTo: ActorRef[Peer.PeerChannels], remoteNodeId: PublicKey, context: ActorContext[PeerChannelsCollector.Command]) {
import PeerChannelsCollector._
private val log = context.log
def collect(pending: Set[ActorRef[CMD_GET_CHANNEL_INFO]], received: Seq[Peer.ChannelInfo]): Behavior[Command] = {
Behaviors.receiveMessagePartial {
case WrappedChannelInfo(channel, state, data) =>
val pending1 = pending - channel
val received1 = received :+ Peer.ChannelInfo(state, data)
if (pending1.isEmpty) {
replyTo ! Peer.PeerChannels(remoteNodeId, received1)
Behaviors.stopped
} else {
collect(pending1, received1)
}
case IgnoreRequest(channel) =>
log.debug("could not fetch peer channel information, channel actor died")
val pending1 = pending - channel
if (pending1.isEmpty) {
replyTo ! Peer.PeerChannels(remoteNodeId, received)
Behaviors.stopped
} else {
collect(pending1, received)
}
}
}
}

View File

@ -17,7 +17,7 @@
package fr.acinq.eclair.io
import akka.actor.typed.eventstream.EventStream
import akka.actor.typed.scaladsl.adapter.TypedActorRefOps
import akka.actor.typed.scaladsl.adapter.ClassicActorRefOps
import akka.actor.typed.scaladsl.{ActorContext, Behaviors, TimerScheduler}
import akka.actor.typed.{ActorRef, Behavior}
import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey
@ -38,7 +38,7 @@ object PeerReadyNotifier {
private case object PeerNotConnected extends Command
private case class SomePeerConnected(nodeId: PublicKey) extends Command
private case class SomePeerDisconnected(nodeId: PublicKey) extends Command
private case class PeerChannels(channels: Set[akka.actor.ActorRef]) extends Command
private case class WrappedPeerInfo(peer: ActorRef[Peer.GetPeerChannels], channelCount: Int) extends Command
private case class NewBlockNotTimedOut(currentBlockHeight: BlockHeight) extends Command
private case object CheckChannelsReady extends Command
private case class ChannelStates(states: Seq[channel.ChannelState]) extends Command
@ -81,7 +81,7 @@ object PeerReadyNotifier {
// In that case we still want to wait for a connection, because we may want to open a channel to them.
case _: Peer.PeerNotFound => PeerNotConnected
case info: Peer.PeerInfo if info.state != Peer.CONNECTED => PeerNotConnected
case info: Peer.PeerInfo => PeerChannels(info.channels)
case info: Peer.PeerInfo => WrappedPeerInfo(info.peer.toTyped, info.channels.size)
}
// We check whether the peer is already connected.
switchboard ! Switchboard.GetPeerInfo(peerInfoAdapter, remoteNodeId)
@ -96,14 +96,14 @@ object PeerReadyNotifier {
Behaviors.same
case SomePeerDisconnected(_) =>
Behaviors.same
case PeerChannels(channels) =>
if (channels.isEmpty) {
case WrappedPeerInfo(peer, channelCount) =>
if (channelCount == 0) {
context.log.info("peer is ready with no channels")
replyTo ! PeerReady(remoteNodeId, 0)
replyTo ! PeerReady(remoteNodeId, channelCount)
Behaviors.stopped
} else {
context.log.debug("peer is connected with {} channels", channels.size)
waitForChannelsReady(replyTo, remoteNodeId, channels, switchboard, context, timers)
context.log.debug("peer is connected with {} channels", channelCount)
waitForChannelsReady(replyTo, remoteNodeId, peer, switchboard, context, timers)
}
case NewBlockNotTimedOut(currentBlockHeight) =>
context.log.debug("waiting for peer to connect at block {}", currentBlockHeight)
@ -115,17 +115,15 @@ object PeerReadyNotifier {
}
}
private def waitForChannelsReady(replyTo: ActorRef[Result], remoteNodeId: PublicKey, channels: Set[akka.actor.ActorRef], switchboard: ActorRef[Switchboard.GetPeerInfo], context: ActorContext[Command], timers: TimerScheduler[Command]): Behavior[Command] = {
var channelCollector_opt = Option.empty[ActorRef[ChannelStatesCollector.Command]]
private def waitForChannelsReady(replyTo: ActorRef[Result], remoteNodeId: PublicKey, peer: ActorRef[Peer.GetPeerChannels], switchboard: ActorRef[Switchboard.GetPeerInfo], context: ActorContext[Command], timers: TimerScheduler[Command]): Behavior[Command] = {
timers.startTimerWithFixedDelay(ChannelsReadyTimerKey, CheckChannelsReady, initialDelay = 50 millis, delay = 1 second)
Behaviors.receiveMessagePartial {
case CheckChannelsReady =>
channelCollector_opt.foreach(ref => context.stop(ref))
channelCollector_opt = Some(context.spawnAnonymous(ChannelStatesCollector(context.self, channels)))
peer ! Peer.GetPeerChannels(context.messageAdapter[Peer.PeerChannels](c => ChannelStates(c.channels.map(_.state))))
Behaviors.same
case ChannelStates(states) =>
if (states.forall(isChannelReady)) {
replyTo ! PeerReady(remoteNodeId, channels.size)
replyTo ! PeerReady(remoteNodeId, states.size)
Behaviors.stopped
} else {
context.log.debug("peer has {} channels that are not ready", states.count(s => !isChannelReady(s)))
@ -182,33 +180,4 @@ object PeerReadyNotifier {
case channel.ERR_INFORMATION_LEAK => true
}
private object ChannelStatesCollector {
// @formatter:off
sealed trait Command
private final case class WrappedChannelState(wrapped: channel.RES_GET_CHANNEL_STATE) extends Command
// @formatter:on
def apply(replyTo: ActorRef[ChannelStates], channels: Set[akka.actor.ActorRef]): Behavior[Command] = {
Behaviors.setup { context =>
val channelStateAdapter = context.messageAdapter[channel.RES_GET_CHANNEL_STATE](WrappedChannelState)
channels.foreach(c => c ! channel.CMD_GET_CHANNEL_STATE(channelStateAdapter.toClassic))
collect(replyTo, Nil, channels.size)
}
}
private def collect(replyTo: ActorRef[ChannelStates], received: Seq[channel.ChannelState], remaining: Int): Behavior[Command] = {
Behaviors.receiveMessage {
case WrappedChannelState(wrapped) => remaining match {
case 1 =>
replyTo ! ChannelStates(received :+ wrapped.state)
Behaviors.stopped
case _ =>
collect(replyTo, received :+ wrapped.state, remaining - 1)
}
}
}
}
}

View File

@ -508,16 +508,16 @@ class EclairImplSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with I
register.reply(map)
val c1 = register.expectMsgType[Register.Forward[CMD_GET_CHANNEL_INFO]]
register.reply(RES_GET_CHANNEL_INFO(map(c1.channelId), c1.channelId, NORMAL, ChannelCodecsSpec.normal))
register.reply(RES_GET_CHANNEL_INFO(map(c1.channelId), c1.channelId, ActorRef.noSender, NORMAL, ChannelCodecsSpec.normal))
register.expectMsgType[Register.Forward[CMD_GET_CHANNEL_INFO]]
register.reply(RES_FAILURE(CMD_GET_CHANNEL_INFO(ActorRef.noSender), new IllegalArgumentException("Non-standard channel")))
val c3 = register.expectMsgType[Register.Forward[CMD_GET_CHANNEL_INFO]]
register.reply(RES_GET_CHANNEL_INFO(map(c3.channelId), c3.channelId, NORMAL, ChannelCodecsSpec.normal))
register.reply(RES_GET_CHANNEL_INFO(map(c3.channelId), c3.channelId, ActorRef.noSender, NORMAL, ChannelCodecsSpec.normal))
register.expectNoMessage()
assert(sender.expectMsgType[Iterable[RES_GET_CHANNEL_INFO]].toSet == Set(
RES_GET_CHANNEL_INFO(a, a1, NORMAL, ChannelCodecsSpec.normal),
RES_GET_CHANNEL_INFO(b, b1, NORMAL, ChannelCodecsSpec.normal),
RES_GET_CHANNEL_INFO(a, a1, ActorRef.noSender, NORMAL, ChannelCodecsSpec.normal),
RES_GET_CHANNEL_INFO(b, b1, ActorRef.noSender, NORMAL, ChannelCodecsSpec.normal),
))
}
@ -539,14 +539,14 @@ class EclairImplSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with I
register.reply(channels2Nodes)
val c1 = register.expectMsgType[Register.Forward[CMD_GET_CHANNEL_INFO]]
register.reply(RES_GET_CHANNEL_INFO(channels2Nodes(c1.channelId), c1.channelId, NORMAL, ChannelCodecsSpec.normal))
register.reply(RES_GET_CHANNEL_INFO(channels2Nodes(c1.channelId), c1.channelId, ActorRef.noSender, NORMAL, ChannelCodecsSpec.normal))
val c2 = register.expectMsgType[Register.Forward[CMD_GET_CHANNEL_INFO]]
register.reply(RES_GET_CHANNEL_INFO(channels2Nodes(c2.channelId), c2.channelId, NORMAL, ChannelCodecsSpec.normal))
register.reply(RES_GET_CHANNEL_INFO(channels2Nodes(c2.channelId), c2.channelId, ActorRef.noSender, NORMAL, ChannelCodecsSpec.normal))
register.expectNoMessage()
assert(sender.expectMsgType[Iterable[RES_GET_CHANNEL_INFO]].toSet == Set(
RES_GET_CHANNEL_INFO(a, a1, NORMAL, ChannelCodecsSpec.normal),
RES_GET_CHANNEL_INFO(a, a2, NORMAL, ChannelCodecsSpec.normal),
RES_GET_CHANNEL_INFO(a, a1, ActorRef.noSender, NORMAL, ChannelCodecsSpec.normal),
RES_GET_CHANNEL_INFO(a, a2, ActorRef.noSender, NORMAL, ChannelCodecsSpec.normal),
))
}
@ -565,10 +565,10 @@ class EclairImplSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with I
eclair.channelInfo(Left(a2)).pipeTo(sender.ref)
val c1 = register.expectMsgType[Register.Forward[CMD_GET_CHANNEL_INFO]]
register.reply(RES_GET_CHANNEL_INFO(channels2Nodes(c1.channelId), c1.channelId, NORMAL, ChannelCodecsSpec.normal))
register.reply(RES_GET_CHANNEL_INFO(channels2Nodes(c1.channelId), c1.channelId, ActorRef.noSender, NORMAL, ChannelCodecsSpec.normal))
register.expectNoMessage()
sender.expectMsg(RES_GET_CHANNEL_INFO(a, a2, NORMAL, ChannelCodecsSpec.normal))
sender.expectMsg(RES_GET_CHANNEL_INFO(a, a2, ActorRef.noSender, NORMAL, ChannelCodecsSpec.normal))
}
test("get sent payment info") { f =>

View File

@ -1,7 +1,8 @@
package fr.acinq.eclair.integration.basic
import fr.acinq.bitcoin.scalacompat.{ByteVector32, SatoshiLong}
import fr.acinq.eclair.channel.{DATA_NORMAL, NORMAL, RealScidStatus}
import fr.acinq.eclair.channel.{DATA_NORMAL, NORMAL, RealScidStatus, WAIT_FOR_FUNDING_CONFIRMED}
import fr.acinq.eclair.integration.basic.fixtures.MinimalNodeFixture.getPeerChannels
import fr.acinq.eclair.integration.basic.fixtures.composite.TwoNodesFixture
import fr.acinq.eclair.testutils.FixtureSpec
import fr.acinq.eclair.{BlockHeight, MilliSatoshiLong}
@ -43,6 +44,19 @@ class TwoNodesIntegrationSpec extends FixtureSpec with IntegrationPatience {
assert(getChannelState(bob, channelId) == NORMAL)
}
test("open multiple channels alice-bob") { f =>
import f._
connect(alice, bob)
val channelId1 = openChannel(alice, bob, 100_000 sat).channelId
val channelId2 = openChannel(bob, alice, 110_000 sat).channelId
val channels = getPeerChannels(alice, bob.nodeId)
assert(channels.map(_.data.channelId).toSet == Set(channelId1, channelId2))
channels.foreach(c => assert(c.state == WAIT_FOR_FUNDING_CONFIRMED))
confirmChannel(alice, bob, channelId1, BlockHeight(420_000), 21)
confirmChannel(bob, alice, channelId2, BlockHeight(420_000), 22)
getPeerChannels(bob, alice.nodeId).foreach(c => assert(c.data.isInstanceOf[DATA_NORMAL]))
}
test("open a channel alice-bob (autoconfirm)") { f =>
import f._
alice.watcher.setAutoPilot(watcherAutopilot(knownFundingTxs(alice)))

View File

@ -7,6 +7,7 @@ import akka.actor.{ActorRef, ActorSystem, typed}
import akka.testkit.{TestActor, TestProbe}
import com.softwaremill.quicklens.ModifyPimp
import com.typesafe.config.ConfigFactory
import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey
import fr.acinq.bitcoin.scalacompat.{Block, ByteVector32, Satoshi, SatoshiLong, Transaction}
import fr.acinq.eclair.ShortChannelId.txIndex
import fr.acinq.eclair.blockchain.DummyOnChainWallet
@ -252,6 +253,14 @@ object MinimalNodeFixture extends Assertions with Eventually with IntegrationPat
sender.expectMsgType[RES_GET_CHANNEL_DATA[ChannelData]].data
}
def getPeerChannels(node: MinimalNodeFixture, remoteNodeId: PublicKey)(implicit system: ActorSystem): Seq[Peer.ChannelInfo] = {
val sender = TestProbe("sender")
node.switchboard ! Switchboard.GetPeerInfo(sender.ref.toTyped, remoteNodeId)
val peer = sender.expectMsgType[Peer.PeerInfo].peer
peer ! Peer.GetPeerChannels(sender.ref.toTyped)
sender.expectMsgType[Peer.PeerChannels].channels
}
def getRouterData(node: MinimalNodeFixture)(implicit system: ActorSystem): Router.Data = {
val sender = TestProbe("sender")
sender.send(node.router, Router.GetRouterData)

View File

@ -0,0 +1,69 @@
package fr.acinq.eclair.io
import akka.actor.testkit.typed.scaladsl.{ScalaTestWithActorTestKit, TestProbe}
import akka.actor.typed.ActorRef
import akka.actor.typed.scaladsl.adapter.TypedActorRefOps
import com.typesafe.config.ConfigFactory
import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey
import fr.acinq.eclair.channel._
import fr.acinq.eclair.io.PeerChannelsCollector.GetChannels
import fr.acinq.eclair.{randomBytes32, randomKey}
import org.scalatest.Outcome
import org.scalatest.funsuite.FixtureAnyFunSuiteLike
import scala.concurrent.duration.DurationInt
class PeerChannelsCollectorSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("application")) with FixtureAnyFunSuiteLike {
case class FixtureParam(remoteNodeId: PublicKey, collector: ActorRef[PeerChannelsCollector.Command], probe: TestProbe[Peer.PeerChannels])
override def withFixture(test: OneArgTest): Outcome = {
val remoteNodeId = randomKey().publicKey
val probe = TestProbe[Peer.PeerChannels]()
val collector = testKit.spawn(PeerChannelsCollector(remoteNodeId))
withFixture(test.toNoArgTest(FixtureParam(remoteNodeId, collector, probe)))
}
test("query multiple channels") { f =>
import f._
val (channel1, channel2, channel3) = (TestProbe[CMD_GET_CHANNEL_INFO](), TestProbe[CMD_GET_CHANNEL_INFO](), TestProbe[CMD_GET_CHANNEL_INFO]())
collector ! GetChannels(probe.ref, Set(channel1, channel2, channel3).map(_.ref))
val request1 = channel1.expectMessageType[CMD_GET_CHANNEL_INFO]
val request2 = channel2.expectMessageType[CMD_GET_CHANNEL_INFO]
val request3 = channel3.expectMessageType[CMD_GET_CHANNEL_INFO]
request1.replyTo ! RES_GET_CHANNEL_INFO(remoteNodeId, randomBytes32(), channel1.ref.toClassic, NORMAL, null)
request2.replyTo ! RES_GET_CHANNEL_INFO(remoteNodeId, randomBytes32(), channel2.ref.toClassic, WAIT_FOR_FUNDING_CONFIRMED, null)
probe.expectNoMessage(100 millis) // we don't send a response back until we receive responses from all channels
request3.replyTo ! RES_GET_CHANNEL_INFO(remoteNodeId, randomBytes32(), channel3.ref.toClassic, CLOSING, null)
val peerChannels = probe.expectMessageType[Peer.PeerChannels]
assert(peerChannels.nodeId == remoteNodeId)
assert(peerChannels.channels.map(_.state).toSet == Set(NORMAL, WAIT_FOR_FUNDING_CONFIRMED, CLOSING))
}
test("channel dies before request") { f =>
import f._
val (channel1, channel2) = (TestProbe[CMD_GET_CHANNEL_INFO](), TestProbe[CMD_GET_CHANNEL_INFO]())
channel1.stop()
collector ! GetChannels(probe.ref, Set(channel1, channel2).map(_.ref))
val request2 = channel2.expectMessageType[CMD_GET_CHANNEL_INFO]
probe.expectNoMessage(100 millis)
request2.replyTo ! RES_GET_CHANNEL_INFO(remoteNodeId, randomBytes32(), channel2.ref.toClassic, NORMAL, null)
assert(probe.expectMessageType[Peer.PeerChannels].channels.size == 1)
}
test("channel dies after request") { f =>
import f._
val (channel1, channel2) = (TestProbe[CMD_GET_CHANNEL_INFO](), TestProbe[CMD_GET_CHANNEL_INFO]())
collector ! GetChannels(probe.ref, Set(channel1, channel2).map(_.ref))
channel1.expectMessageType[CMD_GET_CHANNEL_INFO]
channel1.stop()
val request2 = channel2.expectMessageType[CMD_GET_CHANNEL_INFO]
probe.expectNoMessage(100 millis)
request2.replyTo ! RES_GET_CHANNEL_INFO(remoteNodeId, randomBytes32(), channel2.ref.toClassic, NORMAL, null)
assert(probe.expectMessageType[Peer.PeerChannels].channels.size == 1)
}
}

View File

@ -32,16 +32,14 @@ import scala.concurrent.duration.DurationInt
class PeerReadyNotifierSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("application")) with FixtureAnyFunSuiteLike {
case class FixtureParam(remoteNodeId: PublicKey, switchboard: TestProbe[Switchboard.GetPeerInfo], channelProbes: Seq[TestProbe[CMD_GET_CHANNEL_STATE]], probe: TestProbe[PeerReadyNotifier.Result]) {
def channels: Set[akka.actor.ActorRef] = channelProbes.map(_.ref.toClassic).toSet
}
case class FixtureParam(remoteNodeId: PublicKey, switchboard: TestProbe[Switchboard.GetPeerInfo], peer: TestProbe[Peer.GetPeerChannels], probe: TestProbe[PeerReadyNotifier.Result])
override def withFixture(test: OneArgTest): Outcome = {
val remoteNodeId = randomKey().publicKey
val peer = TestProbe[Peer.GetPeerChannels]("peer")
val switchboard = TestProbe[Switchboard.GetPeerInfo]("switchboard")
val channelProbes = Seq(TestProbe[CMD_GET_CHANNEL_STATE]("channel1"), TestProbe[CMD_GET_CHANNEL_STATE]("channel2"))
val probe = TestProbe[PeerReadyNotifier.Result]()
withFixture(test.toNoArgTest(FixtureParam(remoteNodeId, switchboard, channelProbes, probe)))
withFixture(test.toNoArgTest(FixtureParam(remoteNodeId, switchboard, peer, probe)))
}
test("peer not connected (duration timeout)") { f =>
@ -75,7 +73,7 @@ class PeerReadyNotifierSpec extends ScalaTestWithActorTestKit(ConfigFactory.load
val notifier = testKit.spawn(PeerReadyNotifier(remoteNodeId, switchboard.ref, timeout_opt = Some(Right(BlockHeight(500)))))
notifier ! NotifyWhenPeerReady(probe.ref)
val request = switchboard.expectMessageType[Switchboard.GetPeerInfo]
request.replyTo ! Peer.PeerInfo(TestProbe().ref.toClassic, remoteNodeId, Peer.CONNECTED, None, Set.empty)
request.replyTo ! Peer.PeerInfo(peer.ref.toClassic, remoteNodeId, Peer.CONNECTED, None, Set.empty)
probe.expectMessage(PeerReady(remoteNodeId, 0))
}
@ -84,21 +82,25 @@ class PeerReadyNotifierSpec extends ScalaTestWithActorTestKit(ConfigFactory.load
val notifier = testKit.spawn(PeerReadyNotifier(remoteNodeId, switchboard.ref, timeout_opt = Some(Right(BlockHeight(500)))))
notifier ! NotifyWhenPeerReady(probe.ref)
val request = switchboard.expectMessageType[Switchboard.GetPeerInfo]
request.replyTo ! Peer.PeerInfo(TestProbe().ref.toClassic, remoteNodeId, Peer.CONNECTED, None, channels)
val request1 = switchboard.expectMessageType[Switchboard.GetPeerInfo]
request1.replyTo ! Peer.PeerInfo(peer.ref.toClassic, remoteNodeId, Peer.CONNECTED, None, Set(TestProbe().ref.toClassic, TestProbe().ref.toClassic))
// Channels are not ready yet.
channelProbes.foreach(_.expectMessageType[CMD_GET_CHANNEL_STATE].replyTo ! RES_GET_CHANNEL_STATE(SYNCING))
val channels2 = Seq(Peer.ChannelInfo(SYNCING, null), Peer.ChannelInfo(SYNCING, null))
val request2 = peer.expectMessageType[Peer.GetPeerChannels]
request2.replyTo ! Peer.PeerChannels(remoteNodeId, channels2)
probe.expectNoMessage(100 millis)
// After the first retry, one of the channels is ready but not the second one.
channelProbes.head.expectMessageType[CMD_GET_CHANNEL_STATE].replyTo ! RES_GET_CHANNEL_STATE(NORMAL)
channelProbes.last.expectMessageType[CMD_GET_CHANNEL_STATE].replyTo ! RES_GET_CHANNEL_STATE(SYNCING)
val channels3 = Seq(Peer.ChannelInfo(NORMAL, null), Peer.ChannelInfo(SYNCING, null))
val request3 = peer.expectMessageType[Peer.GetPeerChannels]
request3.replyTo ! Peer.PeerChannels(remoteNodeId, channels3)
probe.expectNoMessage(100 millis)
// After the second retry, both channels are ready.
channelProbes.head.expectMessageType[CMD_GET_CHANNEL_STATE].replyTo ! RES_GET_CHANNEL_STATE(NORMAL)
channelProbes.last.expectMessageType[CMD_GET_CHANNEL_STATE].replyTo ! RES_GET_CHANNEL_STATE(SHUTDOWN)
val channels4 = Seq(Peer.ChannelInfo(NORMAL, null), Peer.ChannelInfo(SHUTDOWN, null))
val request4 = peer.expectMessageType[Peer.GetPeerChannels]
request4.replyTo ! Peer.PeerChannels(remoteNodeId, channels4)
probe.expectMessage(PeerReady(remoteNodeId, 2))
}
@ -108,8 +110,8 @@ class PeerReadyNotifierSpec extends ScalaTestWithActorTestKit(ConfigFactory.load
val notifier = testKit.spawn(PeerReadyNotifier(remoteNodeId, switchboard.ref, timeout_opt = Some(Right(BlockHeight(500)))))
notifier ! NotifyWhenPeerReady(probe.ref)
val request1 = switchboard.expectMessageType[Switchboard.GetPeerInfo]
request1.replyTo ! Peer.PeerInfo(TestProbe().ref.toClassic, remoteNodeId, Peer.DISCONNECTED, None, channels)
channelProbes.head.expectNoMessage(100 millis)
request1.replyTo ! Peer.PeerInfo(peer.ref.toClassic, remoteNodeId, Peer.DISCONNECTED, None, Set(TestProbe().ref.toClassic, TestProbe().ref.toClassic))
peer.expectNoMessage(100 millis)
// An unrelated peer connects.
system.eventStream ! EventStream.Publish(PeerConnected(TestProbe().ref.toClassic, randomKey().publicKey, null))
@ -118,9 +120,10 @@ class PeerReadyNotifierSpec extends ScalaTestWithActorTestKit(ConfigFactory.load
// The target peer connects.
system.eventStream ! EventStream.Publish(PeerConnected(TestProbe().ref.toClassic, remoteNodeId, null))
val request2 = switchboard.expectMessageType[Switchboard.GetPeerInfo]
request2.replyTo ! Peer.PeerInfo(TestProbe().ref.toClassic, remoteNodeId, Peer.CONNECTED, None, channels)
channelProbes.foreach(_.expectMessageType[CMD_GET_CHANNEL_STATE].replyTo ! RES_GET_CHANNEL_STATE(NEGOTIATING))
probe.expectMessage(PeerReady(remoteNodeId, 2))
request2.replyTo ! Peer.PeerInfo(peer.ref.toClassic, remoteNodeId, Peer.CONNECTED, None, Set(TestProbe().ref.toClassic, TestProbe().ref.toClassic))
val channels = Seq(Peer.ChannelInfo(NEGOTIATING, null))
peer.expectMessageType[Peer.GetPeerChannels].replyTo ! Peer.PeerChannels(remoteNodeId, channels)
probe.expectMessage(PeerReady(remoteNodeId, 1))
}
test("peer connects then disconnects") { f =>
@ -130,20 +133,21 @@ class PeerReadyNotifierSpec extends ScalaTestWithActorTestKit(ConfigFactory.load
notifier ! NotifyWhenPeerReady(probe.ref)
val request1 = switchboard.expectMessageType[Switchboard.GetPeerInfo]
request1.replyTo ! Peer.PeerNotFound(remoteNodeId)
channelProbes.head.expectNoMessage(100 millis)
peer.expectNoMessage(100 millis)
// The target peer connects and instantly disconnects.
system.eventStream ! EventStream.Publish(PeerConnected(TestProbe().ref.toClassic, remoteNodeId, null))
val request2 = switchboard.expectMessageType[Switchboard.GetPeerInfo]
request2.replyTo ! Peer.PeerInfo(TestProbe().ref.toClassic, remoteNodeId, Peer.DISCONNECTED, None, channels)
channelProbes.head.expectNoMessage(100 millis)
request2.replyTo ! Peer.PeerInfo(peer.ref.toClassic, remoteNodeId, Peer.DISCONNECTED, None, Set(TestProbe().ref.toClassic))
peer.expectNoMessage(100 millis)
// The target peer reconnects and stays connected.
system.eventStream ! EventStream.Publish(PeerConnected(TestProbe().ref.toClassic, remoteNodeId, null))
val request3 = switchboard.expectMessageType[Switchboard.GetPeerInfo]
request3.replyTo ! Peer.PeerInfo(TestProbe().ref.toClassic, remoteNodeId, Peer.CONNECTED, None, channels)
channelProbes.foreach(_.expectMessageType[CMD_GET_CHANNEL_STATE].replyTo ! RES_GET_CHANNEL_STATE(NEGOTIATING))
probe.expectMessage(PeerReady(remoteNodeId, 2))
request3.replyTo ! Peer.PeerInfo(peer.ref.toClassic, remoteNodeId, Peer.CONNECTED, None, Set(TestProbe().ref.toClassic))
val channels = Seq(Peer.ChannelInfo(CLOSING, null))
peer.expectMessageType[Peer.GetPeerChannels].replyTo ! Peer.PeerChannels(remoteNodeId, channels)
probe.expectMessage(PeerReady(remoteNodeId, 1))
}
test("peer connects then disconnects (while waiting for channel states)") { f =>
@ -152,21 +156,22 @@ class PeerReadyNotifierSpec extends ScalaTestWithActorTestKit(ConfigFactory.load
val notifier = testKit.spawn(PeerReadyNotifier(remoteNodeId, switchboard.ref, timeout_opt = Some(Right(BlockHeight(500)))))
notifier ! NotifyWhenPeerReady(probe.ref)
val request1 = switchboard.expectMessageType[Switchboard.GetPeerInfo]
request1.replyTo ! Peer.PeerInfo(TestProbe().ref.toClassic, remoteNodeId, Peer.DISCONNECTED, None, channels)
channelProbes.head.expectNoMessage(100 millis)
request1.replyTo ! Peer.PeerInfo(peer.ref.toClassic, remoteNodeId, Peer.DISCONNECTED, None, Set.empty)
peer.expectNoMessage(100 millis)
// The target peer connects.
system.eventStream ! EventStream.Publish(PeerConnected(TestProbe().ref.toClassic, remoteNodeId, null))
val request2 = switchboard.expectMessageType[Switchboard.GetPeerInfo]
request2.replyTo ! Peer.PeerInfo(TestProbe().ref.toClassic, remoteNodeId, Peer.CONNECTED, None, channels)
channelProbes.foreach(_.expectMessageType[CMD_GET_CHANNEL_STATE])
request2.replyTo ! Peer.PeerInfo(peer.ref.toClassic, remoteNodeId, Peer.CONNECTED, None, Set(TestProbe().ref.toClassic))
peer.expectMessageType[Peer.GetPeerChannels]
// The target peer disconnects, so we wait for them to connect again.
system.eventStream ! EventStream.Publish(PeerDisconnected(TestProbe().ref.toClassic, remoteNodeId))
val request3 = switchboard.expectMessageType[Switchboard.GetPeerInfo]
request3.replyTo ! Peer.PeerInfo(TestProbe().ref.toClassic, remoteNodeId, Peer.CONNECTED, None, channels)
channelProbes.foreach(_.expectMessageType[CMD_GET_CHANNEL_STATE].replyTo ! RES_GET_CHANNEL_STATE(NORMAL))
probe.expectMessage(PeerReady(remoteNodeId, 2))
request3.replyTo ! Peer.PeerInfo(peer.ref.toClassic, remoteNodeId, Peer.CONNECTED, None, Set(TestProbe().ref.toClassic))
val channels = Seq(Peer.ChannelInfo(NORMAL, null))
peer.expectMessageType[Peer.GetPeerChannels].replyTo ! Peer.PeerChannels(remoteNodeId, channels)
probe.expectMessage(PeerReady(remoteNodeId, 1))
}
test("peer connected (duration timeout)") { f =>
@ -175,8 +180,8 @@ class PeerReadyNotifierSpec extends ScalaTestWithActorTestKit(ConfigFactory.load
val notifier = testKit.spawn(PeerReadyNotifier(remoteNodeId, switchboard.ref, timeout_opt = Some(Left(100 millis))))
notifier ! NotifyWhenPeerReady(probe.ref)
val request = switchboard.expectMessageType[Switchboard.GetPeerInfo]
request.replyTo ! Peer.PeerInfo(TestProbe().ref.toClassic, remoteNodeId, Peer.CONNECTED, None, channels)
channelProbes.foreach(_.expectMessageType[CMD_GET_CHANNEL_STATE])
request.replyTo ! Peer.PeerInfo(peer.ref.toClassic, remoteNodeId, Peer.CONNECTED, None, Set(TestProbe().ref.toClassic))
peer.expectMessageType[Peer.GetPeerChannels]
probe.expectMessage(PeerUnavailable(remoteNodeId))
}
@ -186,8 +191,8 @@ class PeerReadyNotifierSpec extends ScalaTestWithActorTestKit(ConfigFactory.load
val notifier = testKit.spawn(PeerReadyNotifier(remoteNodeId, switchboard.ref, timeout_opt = Some(Right(BlockHeight(100)))))
notifier ! NotifyWhenPeerReady(probe.ref)
val request = switchboard.expectMessageType[Switchboard.GetPeerInfo]
request.replyTo ! Peer.PeerInfo(TestProbe().ref.toClassic, remoteNodeId, Peer.CONNECTED, None, channels)
channelProbes.foreach(_.expectMessageType[CMD_GET_CHANNEL_STATE])
request.replyTo ! Peer.PeerInfo(peer.ref.toClassic, remoteNodeId, Peer.CONNECTED, None, Set(TestProbe().ref.toClassic))
peer.expectMessageType[Peer.GetPeerChannels]
system.eventStream ! EventStream.Publish(CurrentBlockHeight(BlockHeight(100)))
probe.expectMessage(PeerUnavailable(remoteNodeId))
}

View File

@ -8,7 +8,7 @@ import com.typesafe.config.ConfigFactory
import fr.acinq.bitcoin.scalacompat.ByteVector32
import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey
import fr.acinq.eclair.blockchain.CurrentBlockHeight
import fr.acinq.eclair.channel.{CMD_GET_CHANNEL_STATE, NEGOTIATING, RES_GET_CHANNEL_STATE}
import fr.acinq.eclair.channel.NEGOTIATING
import fr.acinq.eclair.io.Switchboard.GetPeerInfo
import fr.acinq.eclair.io.{Peer, PeerConnected, Switchboard}
import fr.acinq.eclair.payment.relay.AsyncPaymentTriggerer._
@ -20,18 +20,16 @@ import scala.concurrent.duration.DurationInt
class AsyncPaymentTriggererSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("application")) with FixtureAnyFunSuiteLike {
case class FixtureParam(remoteNodeId: PublicKey, switchboard: TestProbe[Switchboard.GetPeerInfo], channel: TestProbe[CMD_GET_CHANNEL_STATE], probe: TestProbe[Result], triggerer: ActorRef[Command]) {
def channels: Set[akka.actor.ActorRef] = Set(channel.ref.toClassic)
}
case class FixtureParam(remoteNodeId: PublicKey, switchboard: TestProbe[Switchboard.GetPeerInfo], peer: TestProbe[Peer.GetPeerChannels], probe: TestProbe[Result], triggerer: ActorRef[Command])
override def withFixture(test: OneArgTest): Outcome = {
val remoteNodeId = TestConstants.Alice.nodeParams.nodeId
val switchboard = TestProbe[Switchboard.GetPeerInfo]("switchboard")
val channel = TestProbe[CMD_GET_CHANNEL_STATE]("channel")
val peer = TestProbe[Peer.GetPeerChannels]("peer")
val probe = TestProbe[Result]()
val triggerer = testKit.spawn(AsyncPaymentTriggerer())
triggerer ! Start(switchboard.ref)
withFixture(test.toNoArgTest(FixtureParam(remoteNodeId, switchboard, channel, probe, triggerer)))
withFixture(test.toNoArgTest(FixtureParam(remoteNodeId, switchboard, peer, probe, triggerer)))
}
test("remote node does not connect before timeout") { f =>
@ -112,21 +110,21 @@ class AsyncPaymentTriggererSpec extends ScalaTestWithActorTestKit(ConfigFactory.
triggerer ! Watch(probe.ref, remoteNodeId, paymentHash = ByteVector32.Zeroes, timeout = BlockHeight(100))
val request1 = switchboard.expectMessageType[Switchboard.GetPeerInfo]
request1.replyTo ! Peer.PeerInfo(TestProbe().ref.toClassic, remoteNodeId, Peer.DISCONNECTED, None, channels)
request1.replyTo ! Peer.PeerInfo(peer.ref.toClassic, remoteNodeId, Peer.DISCONNECTED, None, Set(TestProbe().ref.toClassic))
// An unrelated peer connects.
system.eventStream ! EventStream.Publish(PeerConnected(TestProbe().ref.toClassic, randomKey().publicKey, null))
system.eventStream ! EventStream.Publish(PeerConnected(peer.ref.toClassic, randomKey().publicKey, null))
probe.expectNoMessage(100 millis)
// The target peer connects.
system.eventStream ! EventStream.Publish(PeerConnected(TestProbe().ref.toClassic, remoteNodeId, null))
system.eventStream ! EventStream.Publish(PeerConnected(peer.ref.toClassic, remoteNodeId, null))
val request2 = switchboard.expectMessageType[Switchboard.GetPeerInfo]
request2.replyTo ! Peer.PeerInfo(TestProbe().ref.toClassic, remoteNodeId, Peer.CONNECTED, None, channels)
channel.expectMessageType[CMD_GET_CHANNEL_STATE].replyTo ! RES_GET_CHANNEL_STATE(NEGOTIATING)
request2.replyTo ! Peer.PeerInfo(peer.ref.toClassic, remoteNodeId, Peer.CONNECTED, None, Set(TestProbe().ref.toClassic))
peer.expectMessageType[Peer.GetPeerChannels].replyTo ! Peer.PeerChannels(remoteNodeId, Seq(Peer.ChannelInfo(NEGOTIATING, null)))
probe.expectMessage(AsyncPaymentTriggered)
// Only get the trigger message once.
system.eventStream ! EventStream.Publish(PeerConnected(TestProbe().ref.toClassic, remoteNodeId, null))
system.eventStream ! EventStream.Publish(PeerConnected(peer.ref.toClassic, remoteNodeId, null))
probe.expectNoMessage(100 millis)
}
@ -135,7 +133,7 @@ class AsyncPaymentTriggererSpec extends ScalaTestWithActorTestKit(ConfigFactory.
triggerer ! Watch(probe.ref, remoteNodeId, paymentHash = ByteVector32.Zeroes, timeout = BlockHeight(100))
val request1 = switchboard.expectMessageType[Switchboard.GetPeerInfo]
request1.replyTo ! Peer.PeerInfo(TestProbe().ref.toClassic, remoteNodeId, Peer.DISCONNECTED, None, channels)
request1.replyTo ! Peer.PeerInfo(peer.ref.toClassic, remoteNodeId, Peer.DISCONNECTED, None, Set(TestProbe().ref.toClassic))
// Another async payment node relay watches the peer
val probe2 = TestProbe[Result]()
@ -146,10 +144,10 @@ class AsyncPaymentTriggererSpec extends ScalaTestWithActorTestKit(ConfigFactory.
probe.expectMessage(AsyncPaymentTimeout)
// Second watch succeeds
system.eventStream ! EventStream.Publish(PeerConnected(TestProbe().ref.toClassic, remoteNodeId, null))
system.eventStream ! EventStream.Publish(PeerConnected(peer.ref.toClassic, remoteNodeId, null))
val request2 = switchboard.expectMessageType[Switchboard.GetPeerInfo]
request2.replyTo ! Peer.PeerInfo(TestProbe().ref.toClassic, remoteNodeId, Peer.CONNECTED, None, channels)
channel.expectMessageType[CMD_GET_CHANNEL_STATE].replyTo ! RES_GET_CHANNEL_STATE(NEGOTIATING)
request2.replyTo ! Peer.PeerInfo(peer.ref.toClassic, remoteNodeId, Peer.CONNECTED, None, Set(TestProbe().ref.toClassic))
peer.expectMessageType[Peer.GetPeerChannels].replyTo ! Peer.PeerChannels(remoteNodeId, Seq(Peer.ChannelInfo(NEGOTIATING, null)))
probe.expectNoMessage(100 millis)
probe2.expectMessage(AsyncPaymentTriggered)
}
@ -160,28 +158,28 @@ class AsyncPaymentTriggererSpec extends ScalaTestWithActorTestKit(ConfigFactory.
// watch remote node
triggerer ! Watch(probe.ref, remoteNodeId, paymentHash = ByteVector32.Zeroes, timeout = BlockHeight(100))
val request1 = switchboard.expectMessageType[Switchboard.GetPeerInfo]
request1.replyTo ! Peer.PeerInfo(TestProbe().ref.toClassic, remoteNodeId, Peer.DISCONNECTED, None, channels)
request1.replyTo ! Peer.PeerInfo(peer.ref.toClassic, remoteNodeId, Peer.DISCONNECTED, None, Set(TestProbe().ref.toClassic))
// watch another remote node
val remoteNodeId2 = TestConstants.Bob.nodeParams.nodeId
val probe2 = TestProbe[Result]()
triggerer ! Watch(probe2.ref, remoteNodeId2, paymentHash = ByteVector32.Zeroes, timeout = BlockHeight(101))
val request2 = switchboard.expectMessageType[Switchboard.GetPeerInfo]
request2.replyTo ! Peer.PeerInfo(TestProbe().ref.toClassic, remoteNodeId, Peer.DISCONNECTED, None, channels)
request2.replyTo ! Peer.PeerInfo(peer.ref.toClassic, remoteNodeId, Peer.DISCONNECTED, None, Set(TestProbe().ref.toClassic))
// First remote node times out
system.eventStream ! EventStream.Publish(CurrentBlockHeight(BlockHeight(100)))
probe.expectMessage(AsyncPaymentTimeout)
// First remote node connects, but does not trigger expired watch
system.eventStream ! EventStream.Publish(PeerConnected(TestProbe().ref.toClassic, remoteNodeId, null))
system.eventStream ! EventStream.Publish(PeerConnected(peer.ref.toClassic, remoteNodeId, null))
// Second remote node connects and triggers watch
system.eventStream ! EventStream.Publish(PeerConnected(TestProbe().ref.toClassic, remoteNodeId2, null))
system.eventStream ! EventStream.Publish(PeerConnected(peer.ref.toClassic, remoteNodeId2, null))
val request3 = switchboard.expectMessageType[Switchboard.GetPeerInfo]
assert(request3.remoteNodeId == remoteNodeId2)
request3.replyTo ! Peer.PeerInfo(TestProbe().ref.toClassic, remoteNodeId2, Peer.CONNECTED, None, channels)
channel.expectMessageType[CMD_GET_CHANNEL_STATE].replyTo ! RES_GET_CHANNEL_STATE(NEGOTIATING)
request3.replyTo ! Peer.PeerInfo(peer.ref.toClassic, remoteNodeId2, Peer.CONNECTED, None, Set(TestProbe().ref.toClassic))
peer.expectMessageType[Peer.GetPeerChannels].replyTo ! Peer.PeerChannels(remoteNodeId, Seq(Peer.ChannelInfo(NEGOTIATING, null)))
probe.expectNoMessage(100 millis)
probe2.expectMessage(AsyncPaymentTriggered)
}