diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/Logs.scala b/eclair-core/src/main/scala/fr/acinq/eclair/Logs.scala index 4c35152e5..c2263f9af 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/Logs.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/Logs.scala @@ -121,7 +121,7 @@ object Logs { case _: PeerConnection.PendingAuth => Some(LogCategory.CONNECTION) case _: PeerConnection.Authenticated => Some(LogCategory.CONNECTION) case _: PeerConnection.ConnectionReady => Some(LogCategory.CONNECTION) - case _: PeerConnection.InitializeConnection => Some(LogCategory.CONNECTION) + case PeerConnection.InitializeConnection => Some(LogCategory.CONNECTION) case _: PeerConnection.DelayedRebroadcast => Some(LogCategory.ROUTING_SYNC) case _: Ping => Some(LogCategory.CONNECTION) case _: Pong => Some(LogCategory.CONNECTION) diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/io/Peer.scala b/eclair-core/src/main/scala/fr/acinq/eclair/io/Peer.scala index 1ed10a64a..e785fd651 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/io/Peer.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/io/Peer.scala @@ -71,7 +71,8 @@ class Peer(val nodeParams: NodeParams, remoteNodeId: PublicKey, switchboard: Act reconnectionTask forward p stay - case Event(PeerConnection.ConnectionReady(peerConnection, remoteNodeId1, address, outgoing, localInit, remoteInit), d: DisconnectedData) => + case Event(PeerConnection.ConnectionReady(remoteNodeId1, address, outgoing, localInit, remoteInit), d: DisconnectedData) => + val peerConnection = sender require(remoteNodeId == remoteNodeId1, s"invalid nodeid: $remoteNodeId != $remoteNodeId1") log.debug("got authenticated connection to address {}:{}", address.getHostString, address.getPort) @@ -370,7 +371,7 @@ object Peer { case object GetPeerInfo case class PeerInfo(nodeId: PublicKey, state: String, address: Option[InetSocketAddress], channels: Int) - case class PeerRoutingMessage(peerConnection: ActorRef, remoteNodeId: PublicKey, message: LightningMessage) + case class PeerRoutingMessage(remoteNodeId: PublicKey, message: RoutingMessage) // @formatter:on diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/io/PeerConnection.scala b/eclair-core/src/main/scala/fr/acinq/eclair/io/PeerConnection.scala index 3529199cf..126b1b079 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/io/PeerConnection.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/io/PeerConnection.scala @@ -87,7 +87,7 @@ class PeerConnection(nodeParams: NodeParams, switchboard: ActorRef, router: Acto import d.pendingAuth.address log.info(s"connection authenticated with $remoteNodeId@${address.getHostString}:${address.getPort} direction=${if (d.pendingAuth.outgoing) "outgoing" else "incoming"}") Metrics.PeerConnectionsConnecting.withTag(Tags.ConnectionState, Tags.ConnectionStates.Authenticated).increment() - switchboard ! Authenticated(self, remoteNodeId) + switchboard ! Authenticated(remoteNodeId) goto(BEFORE_INIT) using BeforeInitData(remoteNodeId, d.pendingAuth, d.transport) case Event(AuthTimeout, _) => @@ -96,8 +96,9 @@ class PeerConnection(nodeParams: NodeParams, switchboard: ActorRef, router: Acto } when(BEFORE_INIT) { - case Event(InitializeConnection(peer), d: BeforeInitData) => + case Event(InitializeConnection, d: BeforeInitData) => d.transport ! TransportHandler.Listener(self) + val peer = sender Metrics.PeerConnectionsConnecting.withTag(Tags.ConnectionState, Tags.ConnectionStates.Initializing).increment() val localFeatures = nodeParams.overrideFeatures.get(d.remoteNodeId) match { case Some(f) => f @@ -144,7 +145,7 @@ class PeerConnection(nodeParams: NodeParams, switchboard: ActorRef, router: Acto stay } else { Metrics.PeerConnectionsConnecting.withTag(Tags.ConnectionState, Tags.ConnectionStates.Initialized).increment() - d.peer ! ConnectionReady(self, d.remoteNodeId, d.pendingAuth.address, d.pendingAuth.outgoing, d.localInit, remoteInit) + d.peer ! ConnectionReady(d.remoteNodeId, d.pendingAuth.address, d.pendingAuth.outgoing, d.localInit, remoteInit) d.pendingAuth.origin_opt.foreach(origin => origin ! "connected") @@ -160,7 +161,7 @@ class PeerConnection(nodeParams: NodeParams, switchboard: ActorRef, router: Acto val flags_opt = if (canUseChannelRangeQueriesEx) Some(QueryChannelRangeTlv.QueryFlags(QueryChannelRangeTlv.QueryFlags.WANT_ALL)) else None if (d.nodeParams.syncWhitelist.isEmpty || d.nodeParams.syncWhitelist.contains(d.remoteNodeId)) { log.info(s"sending sync channel range query with flags_opt=$flags_opt") - router ! SendChannelQuery(nodeParams.chainHash, d.remoteNodeId, self, flags_opt = flags_opt) + router ! SendChannelQuery(nodeParams.chainHash, d.remoteNodeId, flags_opt = flags_opt) } else { log.info("not syncing with this peer") } @@ -307,7 +308,7 @@ class PeerConnection(nodeParams: NodeParams, switchboard: ActorRef, router: Acto d.transport ! TransportHandler.ReadAck(msg) case _ => // Note: we don't ack messages here because we don't want them to be stacked in the router's mailbox - router ! Peer.PeerRoutingMessage(self, d.remoteNodeId, msg) + router ! Peer.PeerRoutingMessage(d.remoteNodeId, msg) } stay @@ -496,9 +497,9 @@ object PeerConnection { case class PendingAuth(connection: ActorRef, remoteNodeId_opt: Option[PublicKey], address: InetSocketAddress, origin_opt: Option[ActorRef], transport_opt: Option[ActorRef] = None) { def outgoing: Boolean = remoteNodeId_opt.isDefined // if this is an outgoing connection, we know the node id in advance } - case class Authenticated(peerConnection: ActorRef, remoteNodeId: PublicKey) - case class InitializeConnection(peer: ActorRef) - case class ConnectionReady(peerConnection: ActorRef, remoteNodeId: PublicKey, address: InetSocketAddress, outgoing: Boolean, localInit: wire.Init, remoteInit: wire.Init) + case class Authenticated(remoteNodeId: PublicKey) + case object InitializeConnection + case class ConnectionReady(remoteNodeId: PublicKey, address: InetSocketAddress, outgoing: Boolean, localInit: wire.Init, remoteInit: wire.Init) case class DelayedRebroadcast(rebroadcast: Rebroadcast) diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/io/Switchboard.scala b/eclair-core/src/main/scala/fr/acinq/eclair/io/Switchboard.scala index c963bad92..f58e29599 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/io/Switchboard.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/io/Switchboard.scala @@ -72,7 +72,7 @@ class Switchboard(nodeParams: NodeParams, router: ActorRef, watcher: ActorRef, r case authenticated: PeerConnection.Authenticated => // if this is an incoming connection, we might not yet have created the peer val peer = createOrGetPeer(authenticated.remoteNodeId, offlineChannels = Set.empty) - authenticated.peerConnection ! PeerConnection.InitializeConnection(peer) + sender.tell(PeerConnection.InitializeConnection, peer) // switchboard impersonates the peer, which will appear as the sender case 'peers => sender ! context.children diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/router/Router.scala b/eclair-core/src/main/scala/fr/acinq/eclair/router/Router.scala index 6c6405efa..27460a54f 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/router/Router.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/router/Router.scala @@ -184,13 +184,13 @@ class Router(val nodeParams: NodeParams, watcher: ActorRef, initialized: Option[ stay using RouteCalculation.handleRouteRequest(d, nodeParams.routerConf, nodeParams.currentBlockHeight, r) // Warning: order matters here, this must be the first match for HasChainHash messages ! - case Event(PeerRoutingMessage(_, _, routingMessage: HasChainHash), _) if routingMessage.chainHash != nodeParams.chainHash => + case Event(PeerRoutingMessage(_, routingMessage: HasChainHash), _) if routingMessage.chainHash != nodeParams.chainHash => sender ! TransportHandler.ReadAck(routingMessage) log.warning("message {} for wrong chain {}, we're on {}", routingMessage, routingMessage.chainHash, nodeParams.chainHash) stay - case Event(PeerRoutingMessage(peerConnection, remoteNodeId, c: ChannelAnnouncement), d) => - stay using Validation.handleChannelAnnouncement(d, nodeParams.db.network, watcher, RemoteGossip(peerConnection, remoteNodeId), c) + case Event(PeerRoutingMessage(remoteNodeId, c: ChannelAnnouncement), d) => + stay using Validation.handleChannelAnnouncement(d, nodeParams.db.network, watcher, RemoteGossip(sender, remoteNodeId), c) case Event(r: ValidateResult, d) => stay using Validation.handleChannelValidationResponse(d, nodeParams, watcher, r) @@ -201,14 +201,14 @@ class Router(val nodeParams: NodeParams, watcher: ActorRef, initialized: Option[ case Event(n: NodeAnnouncement, d: Data) => stay using Validation.handleNodeAnnouncement(d, nodeParams.db.network, Set(LocalGossip), n) - case Event(PeerRoutingMessage(peerConnection, remoteNodeId, n: NodeAnnouncement), d: Data) => - stay using Validation.handleNodeAnnouncement(d, nodeParams.db.network, Set(RemoteGossip(peerConnection, remoteNodeId)), n) + case Event(PeerRoutingMessage(remoteNodeId, n: NodeAnnouncement), d: Data) => + stay using Validation.handleNodeAnnouncement(d, nodeParams.db.network, Set(RemoteGossip(sender, remoteNodeId)), n) case Event(u: ChannelUpdate, d: Data) => stay using Validation.handleChannelUpdate(d, nodeParams.db.network, nodeParams.routerConf, Set(LocalGossip), u) - case Event(PeerRoutingMessage(peerConnection, remoteNodeId, u: ChannelUpdate), d) => - stay using Validation.handleChannelUpdate(d, nodeParams.db.network, nodeParams.routerConf, Set(RemoteGossip(peerConnection, remoteNodeId)), u) + case Event(PeerRoutingMessage(remoteNodeId, u: ChannelUpdate), d) => + stay using Validation.handleChannelUpdate(d, nodeParams.db.network, nodeParams.routerConf, Set(RemoteGossip(sender, remoteNodeId)), u) case Event(lcu: LocalChannelUpdate, d: Data) => stay using Validation.handleLocalChannelUpdate(d, nodeParams.db.network, nodeParams.routerConf, nodeParams.nodeId, watcher, lcu) @@ -219,19 +219,19 @@ class Router(val nodeParams: NodeParams, watcher: ActorRef, initialized: Option[ case Event(s: SendChannelQuery, d) => stay using Sync.handleSendChannelQuery(d, s) - case Event(PeerRoutingMessage(peerConnection, remoteNodeId, q: QueryChannelRange), d) => - Sync.handleQueryChannelRange(d.channels, nodeParams.routerConf, RemoteGossip(peerConnection, remoteNodeId), q) + case Event(PeerRoutingMessage(remoteNodeId, q: QueryChannelRange), d) => + Sync.handleQueryChannelRange(d.channels, nodeParams.routerConf, RemoteGossip(sender, remoteNodeId), q) stay - case Event(PeerRoutingMessage(peerConnection, remoteNodeId, r: ReplyChannelRange), d) => - stay using Sync.handleReplyChannelRange(d, nodeParams.routerConf, RemoteGossip(peerConnection, remoteNodeId), r) + case Event(PeerRoutingMessage(remoteNodeId, r: ReplyChannelRange), d) => + stay using Sync.handleReplyChannelRange(d, nodeParams.routerConf, RemoteGossip(sender, remoteNodeId), r) - case Event(PeerRoutingMessage(peerConnection, remoteNodeId, q: QueryShortChannelIds), d) => - Sync.handleQueryShortChannelIds(d.nodes, d.channels, RemoteGossip(peerConnection, remoteNodeId), q) + case Event(PeerRoutingMessage(remoteNodeId, q: QueryShortChannelIds), d) => + Sync.handleQueryShortChannelIds(d.nodes, d.channels, RemoteGossip(sender, remoteNodeId), q) stay - case Event(PeerRoutingMessage(peerConnection, remoteNodeId, r: ReplyShortChannelIdsEnd), d) => - stay using Sync.handleReplyShortChannelIdsEnd(d, RemoteGossip(peerConnection, remoteNodeId), r) + case Event(PeerRoutingMessage(remoteNodeId, r: ReplyShortChannelIdsEnd), d) => + stay using Sync.handleReplyShortChannelIdsEnd(d, RemoteGossip(sender, remoteNodeId), r) } @@ -364,7 +364,7 @@ object Router { // @formatter:on // @formatter:off - case class SendChannelQuery(chainHash: ByteVector32, remoteNodeId: PublicKey, to: ActorRef, flags_opt: Option[QueryChannelRangeTlv]) + case class SendChannelQuery(chainHash: ByteVector32, remoteNodeId: PublicKey, flags_opt: Option[QueryChannelRangeTlv]) case object GetNetworkStats case class GetNetworkStatsResponse(stats: Option[NetworkStats]) case object GetRoutingState diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/router/Sync.scala b/eclair-core/src/main/scala/fr/acinq/eclair/router/Sync.scala index 443fc1783..0ebb5dbf8 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/router/Sync.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/router/Sync.scala @@ -52,14 +52,14 @@ object Sync { // have to worry about sending a new query_channel_range when another query is still in progress val query = QueryChannelRange(s.chainHash, firstBlockNum = 0L, numberOfBlocks = Int.MaxValue.toLong, TlvStream(s.flags_opt.toList)) log.info("sending query_channel_range={}", query) - s.to ! query + ctx.sender ! query // we also set a pass-all filter for now (we can update it later) for the future gossip messages, by setting // the first_timestamp field to the current date/time and timestamp_range to the maximum value // NB: we can't just set firstTimestamp to 0, because in that case peer would send us all past messages matching // that (i.e. the whole routing table) val filter = GossipTimestampFilter(s.chainHash, firstTimestamp = Platform.currentTime.milliseconds.toSeconds, timestampRange = Int.MaxValue) - s.to ! filter + ctx.sender ! filter // clean our sync state for this peer: we receive a SendChannelQuery just when we connect/reconnect to a peer and // will start a new complete sync process diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/integration/IntegrationSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/integration/IntegrationSpec.scala index 76463e020..cc31de702 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/integration/IntegrationSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/integration/IntegrationSpec.scala @@ -1259,7 +1259,7 @@ class IntegrationSpec extends TestKit(ActorSystem("test")) with BitcoindService // then we make the announcements val announcements = channels.map(c => AnnouncementsBatchValidationSpec.makeChannelAnnouncement(c)) announcements.foreach { ann => - nodes("A").router ! PeerRoutingMessage(sender.ref, remoteNodeId, ann) + sender.send(nodes("A").router, PeerRoutingMessage(remoteNodeId, ann)) sender.expectMsg(TransportHandler.ReadAck(ann)) sender.expectMsg(GossipDecision.Accepted(ann)) } diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/io/PeerConnectionSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/io/PeerConnectionSpec.scala index 6b5ca302d..2d92f829f 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/io/PeerConnectionSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/io/PeerConnectionSpec.scala @@ -73,8 +73,9 @@ class PeerConnectionSpec extends TestkitBaseClass with StateTestsHelperMethods { val probe = TestProbe() probe.send(peerConnection, PeerConnection.PendingAuth(connection.ref, Some(remoteNodeId), address, origin_opt = None, transport_opt = Some(transport.ref))) transport.send(peerConnection, TransportHandler.HandshakeCompleted(remoteNodeId)) - switchboard.expectMsg(PeerConnection.Authenticated(peerConnection, remoteNodeId)) - probe.send(peerConnection, PeerConnection.InitializeConnection(peer.ref)) + switchboard.expectMsg(PeerConnection.Authenticated(remoteNodeId)) + assert(switchboard.sender() == peerConnection) + peer.send(peerConnection, PeerConnection.InitializeConnection) transport.expectMsgType[TransportHandler.Listener] val localInit = transport.expectMsgType[wire.Init] assert(localInit.networks === List(Block.RegtestGenesisBlock.hash)) @@ -85,7 +86,8 @@ class PeerConnectionSpec extends TestkitBaseClass with StateTestsHelperMethods { } else { router.expectNoMsg(1 second) } - peer.expectMsg(PeerConnection.ConnectionReady(peerConnection, remoteNodeId, address, outgoing = true, localInit, remoteInit)) + peer.expectMsg(PeerConnection.ConnectionReady(remoteNodeId, address, outgoing = true, localInit, remoteInit)) + assert(peer.sender() == peerConnection) assert(peerConnection.stateName === PeerConnection.CONNECTED) } @@ -117,7 +119,7 @@ class PeerConnectionSpec extends TestkitBaseClass with StateTestsHelperMethods { probe.watch(peerConnection) probe.send(peerConnection, PeerConnection.PendingAuth(connection.ref, Some(remoteNodeId), address, origin_opt = None, transport_opt = Some(transport.ref))) transport.send(peerConnection, TransportHandler.HandshakeCompleted(remoteNodeId)) - probe.send(peerConnection, PeerConnection.InitializeConnection(peer.ref)) + probe.send(peerConnection, PeerConnection.InitializeConnection) probe.expectTerminated(peerConnection, nodeParams.initTimeout / transport.testKitSettings.TestTimeFactor + 1.second) // we don't want dilated time here } @@ -127,7 +129,7 @@ class PeerConnectionSpec extends TestkitBaseClass with StateTestsHelperMethods { probe.watch(transport.ref) probe.send(peerConnection, PeerConnection.PendingAuth(connection.ref, Some(remoteNodeId), address, origin_opt = None, transport_opt = Some(transport.ref))) transport.send(peerConnection, TransportHandler.HandshakeCompleted(remoteNodeId)) - probe.send(peerConnection, PeerConnection.InitializeConnection(peer.ref)) + peer.send(peerConnection, PeerConnection.InitializeConnection) transport.expectMsgType[TransportHandler.Listener] transport.expectMsgType[wire.Init] transport.send(peerConnection, LightningMessageCodecs.initCodec.decode(hex"0000 00050100000000".bits).require.value) @@ -141,7 +143,7 @@ class PeerConnectionSpec extends TestkitBaseClass with StateTestsHelperMethods { probe.watch(transport.ref) probe.send(peerConnection, PeerConnection.PendingAuth(connection.ref, Some(remoteNodeId), address, origin_opt = None, transport_opt = Some(transport.ref))) transport.send(peerConnection, TransportHandler.HandshakeCompleted(remoteNodeId)) - probe.send(peerConnection, PeerConnection.InitializeConnection(peer.ref)) + peer.send(peerConnection, PeerConnection.InitializeConnection) transport.expectMsgType[TransportHandler.Listener] transport.expectMsgType[wire.Init] transport.send(peerConnection, LightningMessageCodecs.initCodec.decode(hex"00050100000000 0000".bits).require.value) @@ -167,7 +169,7 @@ class PeerConnectionSpec extends TestkitBaseClass with StateTestsHelperMethods { val peerConnection = TestFSMRef(new PeerConnection(nodeParams, switchboard.ref, router.ref)) probe.send(peerConnection, PeerConnection.PendingAuth(connection.ref, Some(remoteNodeId), address, origin_opt = None, transport_opt = Some(transport.ref))) transport.send(peerConnection, TransportHandler.HandshakeCompleted(remoteNodeId)) - probe.send(peerConnection, PeerConnection.InitializeConnection(peer.ref)) + peer.send(peerConnection, PeerConnection.InitializeConnection) transport.expectMsgType[TransportHandler.Listener] val init = transport.expectMsgType[wire.Init] assert(init.features === sentFeatures.bytes) @@ -180,7 +182,7 @@ class PeerConnectionSpec extends TestkitBaseClass with StateTestsHelperMethods { probe.watch(transport.ref) probe.send(peerConnection, PeerConnection.PendingAuth(connection.ref, Some(remoteNodeId), address, origin_opt = None, transport_opt = Some(transport.ref))) transport.send(peerConnection, TransportHandler.HandshakeCompleted(remoteNodeId)) - probe.send(peerConnection, PeerConnection.InitializeConnection(peer.ref)) + peer.send(peerConnection, PeerConnection.InitializeConnection) transport.expectMsgType[TransportHandler.Listener] transport.expectMsgType[wire.Init] transport.send(peerConnection, wire.Init(Bob.nodeParams.features, TlvStream(InitTlv.Networks(Block.LivenetGenesisBlock.hash :: Block.SegnetGenesisBlock.hash :: Nil)))) @@ -335,7 +337,7 @@ class PeerConnectionSpec extends TestkitBaseClass with StateTestsHelperMethods { // make sure that routing messages go through for (ann <- channels ++ updates) { transport.send(peerConnection, ann) - router.expectMsg(Peer.PeerRoutingMessage(peerConnection, remoteNodeId, ann)) + router.expectMsg(Peer.PeerRoutingMessage(remoteNodeId, ann)) } transport.expectNoMsg(1 second) // peer hasn't acknowledged the messages @@ -351,7 +353,7 @@ class PeerConnectionSpec extends TestkitBaseClass with StateTestsHelperMethods { router.expectNoMsg(1 second) // other routing messages go through transport.send(peerConnection, query) - router.expectMsg(Peer.PeerRoutingMessage(peerConnection, remoteNodeId, query)) + router.expectMsg(Peer.PeerRoutingMessage(remoteNodeId, query)) // after a while the ban is lifted probe.send(peerConnection, PeerConnection.ResumeAnnouncements) @@ -359,7 +361,7 @@ class PeerConnectionSpec extends TestkitBaseClass with StateTestsHelperMethods { // and announcements are processed again for (ann <- channels ++ updates) { transport.send(peerConnection, ann) - router.expectMsg(Peer.PeerRoutingMessage(peerConnection, remoteNodeId, ann)) + router.expectMsg(Peer.PeerRoutingMessage(remoteNodeId, ann)) } transport.expectNoMsg(1 second) // peer hasn't acknowledged the messages diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/io/PeerSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/io/PeerSpec.scala index 63500babb..b0680c500 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/io/PeerSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/io/PeerSpec.scala @@ -71,7 +71,7 @@ class PeerSpec extends TestkitBaseClass with StateTestsHelperMethods { // let's simulate a connection switchboard.send(peer, Peer.Init(channels)) val localInit = wire.Init(peer.underlyingActor.nodeParams.features) - switchboard.send(peer, PeerConnection.ConnectionReady(peerConnection.ref, remoteNodeId, fakeIPAddress.socketAddress, outgoing = true, localInit, remoteInit)) + peerConnection.send(peer, PeerConnection.ConnectionReady(remoteNodeId, fakeIPAddress.socketAddress, outgoing = true, localInit, remoteInit)) val probe = TestProbe() probe.send(peer, Peer.GetPeerInfo) assert(probe.expectMsgType[Peer.PeerInfo].state == "CONNECTED") diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/payment/PaymentLifecycleSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/payment/PaymentLifecycleSpec.scala index f6f36e5b7..9c15ffa10 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/payment/PaymentLifecycleSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/payment/PaymentLifecycleSpec.scala @@ -495,11 +495,10 @@ class PaymentLifecycleSpec extends BaseRouterSpec { val channelUpdate_bg = makeChannelUpdate(Block.RegtestGenesisBlock.hash, priv_b, g, channelId_bg, CltvExpiryDelta(9), htlcMinimumMsat = 0 msat, feeBaseMsat = 0 msat, feeProportionalMillionths = 0, htlcMaximumMsat = 500000000 msat) val channelUpdate_gb = makeChannelUpdate(Block.RegtestGenesisBlock.hash, priv_g, b, channelId_bg, CltvExpiryDelta(9), htlcMinimumMsat = 0 msat, feeBaseMsat = 10 msat, feeProportionalMillionths = 8, htlcMaximumMsat = 500000000 msat) assert(Router.getDesc(channelUpdate_bg, chan_bg) === ChannelDesc(chan_bg.shortChannelId, priv_b.publicKey, priv_g.publicKey)) - val peerConnection = TestProbe() - router ! PeerRoutingMessage(peerConnection.ref, remoteNodeId, chan_bg) - router ! PeerRoutingMessage(peerConnection.ref, remoteNodeId, ann_g) - router ! PeerRoutingMessage(peerConnection.ref, remoteNodeId, channelUpdate_bg) - router ! PeerRoutingMessage(peerConnection.ref, remoteNodeId, channelUpdate_gb) + router ! PeerRoutingMessage(remoteNodeId, chan_bg) + router ! PeerRoutingMessage(remoteNodeId, ann_g) + router ! PeerRoutingMessage(remoteNodeId, channelUpdate_bg) + router ! PeerRoutingMessage(remoteNodeId, channelUpdate_gb) watcher.expectMsg(ValidateRequest(chan_bg)) watcher.send(router, ValidateResult(chan_bg, Right((Transaction(version = 0, txIn = Nil, txOut = TxOut(1000000 sat, write(pay2wsh(Scripts.multiSig2of2(funding_b, funding_g)))) :: Nil, lockTime = 0), UtxoStatus.Unspent)))) watcher.expectMsgType[WatchSpentBasic] diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/router/BaseRouterSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/router/BaseRouterSpec.scala index f38387f45..10afb35c1 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/router/BaseRouterSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/router/BaseRouterSpec.scala @@ -106,26 +106,26 @@ abstract class BaseRouterSpec extends TestkitBaseClass { val nodeParams = Alice.nodeParams val router = system.actorOf(Router.props(nodeParams, watcher.ref)) // we announce channels - peerConnection.send(router, PeerRoutingMessage(peerConnection.ref, remoteNodeId, chan_ab)) - peerConnection.send(router, PeerRoutingMessage(peerConnection.ref, remoteNodeId, chan_bc)) - peerConnection.send(router, PeerRoutingMessage(peerConnection.ref, remoteNodeId, chan_cd)) - peerConnection.send(router, PeerRoutingMessage(peerConnection.ref, remoteNodeId, chan_ef)) + peerConnection.send(router, PeerRoutingMessage(remoteNodeId, chan_ab)) + peerConnection.send(router, PeerRoutingMessage(remoteNodeId, chan_bc)) + peerConnection.send(router, PeerRoutingMessage(remoteNodeId, chan_cd)) + peerConnection.send(router, PeerRoutingMessage(remoteNodeId, chan_ef)) // then nodes - peerConnection.send(router, PeerRoutingMessage(peerConnection.ref, remoteNodeId, node_a)) - peerConnection.send(router, PeerRoutingMessage(peerConnection.ref, remoteNodeId, node_b)) - peerConnection.send(router, PeerRoutingMessage(peerConnection.ref, remoteNodeId, node_c)) - peerConnection.send(router, PeerRoutingMessage(peerConnection.ref, remoteNodeId, node_d)) - peerConnection.send(router, PeerRoutingMessage(peerConnection.ref, remoteNodeId, node_e)) - peerConnection.send(router, PeerRoutingMessage(peerConnection.ref, remoteNodeId, node_f)) + peerConnection.send(router, PeerRoutingMessage(remoteNodeId, node_a)) + peerConnection.send(router, PeerRoutingMessage(remoteNodeId, node_b)) + peerConnection.send(router, PeerRoutingMessage(remoteNodeId, node_c)) + peerConnection.send(router, PeerRoutingMessage(remoteNodeId, node_d)) + peerConnection.send(router, PeerRoutingMessage(remoteNodeId, node_e)) + peerConnection.send(router, PeerRoutingMessage(remoteNodeId, node_f)) // then channel updates - peerConnection.send(router, PeerRoutingMessage(peerConnection.ref, remoteNodeId, update_ab)) - peerConnection.send(router, PeerRoutingMessage(peerConnection.ref, remoteNodeId, update_ba)) - peerConnection.send(router, PeerRoutingMessage(peerConnection.ref, remoteNodeId, update_bc)) - peerConnection.send(router, PeerRoutingMessage(peerConnection.ref, remoteNodeId, update_cb)) - peerConnection.send(router, PeerRoutingMessage(peerConnection.ref, remoteNodeId, update_cd)) - peerConnection.send(router, PeerRoutingMessage(peerConnection.ref, remoteNodeId, update_dc)) - peerConnection.send(router, PeerRoutingMessage(peerConnection.ref, remoteNodeId, update_ef)) - peerConnection.send(router, PeerRoutingMessage(peerConnection.ref, remoteNodeId, update_fe)) + peerConnection.send(router, PeerRoutingMessage(remoteNodeId, update_ab)) + peerConnection.send(router, PeerRoutingMessage(remoteNodeId, update_ba)) + peerConnection.send(router, PeerRoutingMessage(remoteNodeId, update_bc)) + peerConnection.send(router, PeerRoutingMessage(remoteNodeId, update_cb)) + peerConnection.send(router, PeerRoutingMessage(remoteNodeId, update_cd)) + peerConnection.send(router, PeerRoutingMessage(remoteNodeId, update_dc)) + peerConnection.send(router, PeerRoutingMessage(remoteNodeId, update_ef)) + peerConnection.send(router, PeerRoutingMessage(remoteNodeId, update_fe)) // watcher receives the get tx requests watcher.expectMsg(ValidateRequest(chan_ab)) watcher.expectMsg(ValidateRequest(chan_bc)) diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/router/RouterSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/router/RouterSpec.scala index 8347a3b72..033f0513e 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/router/RouterSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/router/RouterSpec.scala @@ -57,7 +57,7 @@ class RouterSpec extends BaseRouterSpec { val chan_ac = channelAnnouncement(ShortChannelId(420000, 5, 0), priv_a, priv_c, priv_funding_a, priv_funding_c) val update_ac = makeChannelUpdate(Block.RegtestGenesisBlock.hash, priv_a, c, chan_ac.shortChannelId, CltvExpiryDelta(7), 0 msat, 766000 msat, 10, 500000000L msat) val node_c = makeNodeAnnouncement(priv_c, "node-C", Color(123, 100, -40), Nil, hex"0200", timestamp = Platform.currentTime.milliseconds.toSeconds + 1) - router ! PeerRoutingMessage(peerConnection.ref, remoteNodeId, chan_ac) + peerConnection.send(router, PeerRoutingMessage(remoteNodeId, chan_ac)) peerConnection.expectNoMsg(100 millis) // we don't immediately acknowledge the announcement (back pressure) watcher.expectMsg(ValidateRequest(chan_ac)) watcher.send(router, ValidateResult(chan_ac, Right(Transaction(version = 0, txIn = Nil, txOut = TxOut(1000000 sat, write(pay2wsh(Scripts.multiSig2of2(funding_a, funding_c)))) :: Nil, lockTime = 0), UtxoStatus.Unspent))) @@ -65,10 +65,10 @@ class RouterSpec extends BaseRouterSpec { peerConnection.expectMsg(GossipDecision.Accepted(chan_ac)) assert(peerConnection.sender() == router) watcher.expectMsgType[WatchSpentBasic] - router ! PeerRoutingMessage(peerConnection.ref, remoteNodeId, update_ac) + peerConnection.send(router, PeerRoutingMessage(remoteNodeId, update_ac)) peerConnection.expectMsg(TransportHandler.ReadAck(update_ac)) peerConnection.expectMsg(GossipDecision.Accepted(update_ac)) - router ! PeerRoutingMessage(peerConnection.ref, remoteNodeId, node_c) + peerConnection.send(router, PeerRoutingMessage(remoteNodeId, node_c)) peerConnection.expectMsg(TransportHandler.ReadAck(node_c)) peerConnection.expectMsg(GossipDecision.Accepted(node_c)) eventListener.expectMsg(ChannelsDiscovered(SingleChannelDiscovered(chan_ac, 1000000 sat, None, None) :: Nil)) @@ -87,12 +87,12 @@ class RouterSpec extends BaseRouterSpec { val chan_uc = channelAnnouncement(ShortChannelId(420000, 6, 0), priv_u, priv_c, priv_funding_u, priv_funding_c) val update_uc = makeChannelUpdate(Block.RegtestGenesisBlock.hash, priv_u, c, chan_uc.shortChannelId, CltvExpiryDelta(7), 0 msat, 766000 msat, 10, 500000000L msat) val node_u = makeNodeAnnouncement(priv_u, "node-U", Color(-120, -20, 60), Nil, hex"00") - router ! PeerRoutingMessage(peerConnection.ref, remoteNodeId, chan_uc) + peerConnection.send(router, PeerRoutingMessage(remoteNodeId, chan_uc)) peerConnection.expectNoMsg(200 millis) // we don't immediately acknowledge the announcement (back pressure) watcher.expectMsg(ValidateRequest(chan_uc)) - router ! PeerRoutingMessage(peerConnection.ref, remoteNodeId, update_uc) + peerConnection.send(router, PeerRoutingMessage(remoteNodeId, update_uc)) peerConnection.expectMsg(TransportHandler.ReadAck(update_uc)) - router ! PeerRoutingMessage(peerConnection.ref, remoteNodeId, node_u) + peerConnection.send(router, PeerRoutingMessage(remoteNodeId, node_u)) peerConnection.expectMsg(TransportHandler.ReadAck(node_u)) watcher.send(router, ValidateResult(chan_uc, Right(Transaction(version = 0, txIn = Nil, txOut = TxOut(2000000 sat, write(pay2wsh(Scripts.multiSig2of2(priv_funding_u.publicKey, funding_c)))) :: Nil, lockTime = 0), UtxoStatus.Unspent))) peerConnection.expectMsg(TransportHandler.ReadAck(chan_uc)) @@ -112,13 +112,13 @@ class RouterSpec extends BaseRouterSpec { { // duplicates - router ! PeerRoutingMessage(peerConnection.ref, remoteNodeId, node_a) + peerConnection.send(router, PeerRoutingMessage(remoteNodeId, node_a)) peerConnection.expectMsg(TransportHandler.ReadAck(node_a)) peerConnection.expectMsg(GossipDecision.Duplicate(node_a)) - router ! PeerRoutingMessage(peerConnection.ref, remoteNodeId, chan_ab) + peerConnection.send(router, PeerRoutingMessage(remoteNodeId, chan_ab)) peerConnection.expectMsg(TransportHandler.ReadAck(chan_ab)) peerConnection.expectMsg(GossipDecision.Duplicate(chan_ab)) - router ! PeerRoutingMessage(peerConnection.ref, remoteNodeId, update_ab) + peerConnection.send(router, PeerRoutingMessage(remoteNodeId, update_ab)) peerConnection.expectMsg(TransportHandler.ReadAck(update_ab)) peerConnection.expectMsg(GossipDecision.Duplicate(update_ab)) peerConnection.expectNoMsg(100 millis) @@ -131,13 +131,13 @@ class RouterSpec extends BaseRouterSpec { val invalid_node_a = node_a.copy(timestamp = node_a.timestamp + 10) val invalid_chan_a = channelAnnouncement(ShortChannelId(420000, 5, 1), priv_a, priv_c, priv_funding_a, priv_funding_c).copy(nodeId1 = randomKey.publicKey) val invalid_update_ab = update_ab.copy(cltvExpiryDelta = CltvExpiryDelta(21), timestamp = update_ab.timestamp + 1) - router ! PeerRoutingMessage(peerConnection.ref, remoteNodeId, invalid_node_a) + peerConnection.send(router, PeerRoutingMessage(remoteNodeId, invalid_node_a)) peerConnection.expectMsg(TransportHandler.ReadAck(invalid_node_a)) peerConnection.expectMsg(GossipDecision.InvalidSignature(invalid_node_a)) - router ! PeerRoutingMessage(peerConnection.ref, remoteNodeId, invalid_chan_a) + peerConnection.send(router, PeerRoutingMessage(remoteNodeId, invalid_chan_a)) peerConnection.expectMsg(TransportHandler.ReadAck(invalid_chan_a)) peerConnection.expectMsg(GossipDecision.InvalidSignature(invalid_chan_a)) - router ! PeerRoutingMessage(peerConnection.ref, remoteNodeId, invalid_update_ab) + peerConnection.send(router, PeerRoutingMessage(remoteNodeId, invalid_update_ab)) peerConnection.expectMsg(TransportHandler.ReadAck(invalid_update_ab)) peerConnection.expectMsg(GossipDecision.InvalidSignature(invalid_update_ab)) peerConnection.expectNoMsg(100 millis) @@ -151,7 +151,7 @@ class RouterSpec extends BaseRouterSpec { val priv_funding_v = randomKey val chan_vc = channelAnnouncement(ShortChannelId(420000, 7, 0), priv_v, priv_c, priv_funding_v, priv_funding_c) nodeParams.db.network.addToPruned(chan_vc.shortChannelId :: Nil) - router ! PeerRoutingMessage(peerConnection.ref, remoteNodeId, chan_vc) + peerConnection.send(router, PeerRoutingMessage(remoteNodeId, chan_vc)) peerConnection.expectMsg(TransportHandler.ReadAck(chan_vc)) peerConnection.expectMsg(GossipDecision.ChannelPruned(chan_vc)) peerConnection.expectNoMsg(100 millis) @@ -162,7 +162,7 @@ class RouterSpec extends BaseRouterSpec { { // stale channel update val update_ab = makeChannelUpdate(Block.RegtestGenesisBlock.hash, priv_a, priv_b.publicKey, chan_ab.shortChannelId, CltvExpiryDelta(7), 0 msat, 766000 msat, 10, 500000000L msat, timestamp = (Platform.currentTime.milliseconds - 15.days).toSeconds) - router ! PeerRoutingMessage(peerConnection.ref, remoteNodeId, update_ab) + peerConnection.send(router, PeerRoutingMessage(remoteNodeId, update_ab)) peerConnection.expectMsg(TransportHandler.ReadAck(update_ab)) peerConnection.expectMsg(GossipDecision.Stale(update_ab)) peerConnection.expectNoMsg(100 millis) @@ -175,10 +175,10 @@ class RouterSpec extends BaseRouterSpec { val priv_y = randomKey val update_ay = makeChannelUpdate(Block.RegtestGenesisBlock.hash, priv_a, priv_y.publicKey, ShortChannelId(4646464), CltvExpiryDelta(7), 0 msat, 766000 msat, 10, 500000000L msat) val node_y = makeNodeAnnouncement(priv_y, "node-Y", Color(123, 100, -40), Nil, hex"0200") - router ! PeerRoutingMessage(peerConnection.ref, remoteNodeId, update_ay) + peerConnection.send(router, PeerRoutingMessage(remoteNodeId, update_ay)) peerConnection.expectMsg(TransportHandler.ReadAck(update_ay)) peerConnection.expectMsg(GossipDecision.NoRelatedChannel(update_ay)) - router ! PeerRoutingMessage(peerConnection.ref, remoteNodeId, node_y) + peerConnection.send(router, PeerRoutingMessage(remoteNodeId, node_y)) peerConnection.expectMsg(TransportHandler.ReadAck(node_y)) peerConnection.expectMsg(GossipDecision.NoKnownChannel(node_y)) peerConnection.expectNoMsg(100 millis) @@ -193,11 +193,11 @@ class RouterSpec extends BaseRouterSpec { val chan_ay = channelAnnouncement(ShortChannelId(42002), priv_a, priv_y, priv_funding_a, priv_funding_y) val update_ay = makeChannelUpdate(Block.RegtestGenesisBlock.hash, priv_a, priv_y.publicKey, chan_ay.shortChannelId, CltvExpiryDelta(7), 0 msat, 766000 msat, 10, 500000000L msat) val node_y = makeNodeAnnouncement(priv_y, "node-Y", Color(123, 100, -40), Nil, hex"0200") - router ! PeerRoutingMessage(peerConnection.ref, remoteNodeId, chan_ay) + peerConnection.send(router, PeerRoutingMessage(remoteNodeId, chan_ay)) watcher.expectMsg(ValidateRequest(chan_ay)) - router ! PeerRoutingMessage(peerConnection.ref, remoteNodeId, update_ay) + peerConnection.send(router, PeerRoutingMessage(remoteNodeId, update_ay)) peerConnection.expectMsg(TransportHandler.ReadAck(update_ay)) - router ! PeerRoutingMessage(peerConnection.ref, remoteNodeId, node_y) + peerConnection.send(router, PeerRoutingMessage(remoteNodeId, node_y)) peerConnection.expectMsg(TransportHandler.ReadAck(node_y)) watcher.send(router, ValidateResult(chan_ay, Right(Transaction(version = 0, txIn = Nil, txOut = TxOut(1000000 sat, write(pay2wsh(Scripts.multiSig2of2(funding_a, randomKey.publicKey)))) :: Nil, lockTime = 0), UtxoStatus.Unspent))) peerConnection.expectMsg(TransportHandler.ReadAck(chan_ay)) @@ -213,7 +213,7 @@ class RouterSpec extends BaseRouterSpec { // validation failure val priv_x = randomKey val chan_ax = channelAnnouncement(ShortChannelId(42001), priv_a, priv_x, priv_funding_a, randomKey) - router ! PeerRoutingMessage(peerConnection.ref, remoteNodeId, chan_ax) + peerConnection.send(router, PeerRoutingMessage(remoteNodeId, chan_ax)) watcher.expectMsg(ValidateRequest(chan_ax)) watcher.send(router, ValidateResult(chan_ax, Left(new RuntimeException("funding tx not found")))) peerConnection.expectMsg(TransportHandler.ReadAck(chan_ax)) @@ -228,7 +228,7 @@ class RouterSpec extends BaseRouterSpec { val priv_z = randomKey val priv_funding_z = randomKey val chan_az = channelAnnouncement(ShortChannelId(42003), priv_a, priv_z, priv_funding_a, priv_funding_z) - router ! PeerRoutingMessage(peerConnection.ref, remoteNodeId, chan_az) + peerConnection.send(router, PeerRoutingMessage(remoteNodeId, chan_az)) watcher.expectMsg(ValidateRequest(chan_az)) watcher.send(router, ValidateResult(chan_az, Right(Transaction(version = 0, txIn = Nil, txOut = TxOut(1000000 sat, write(pay2wsh(Scripts.multiSig2of2(funding_a, priv_funding_z.publicKey)))) :: Nil, lockTime = 0), UtxoStatus.Spent(spendingTxConfirmed = false)))) peerConnection.expectMsg(TransportHandler.ReadAck(chan_az)) @@ -243,7 +243,7 @@ class RouterSpec extends BaseRouterSpec { val priv_z = randomKey val priv_funding_z = randomKey val chan_az = channelAnnouncement(ShortChannelId(42003), priv_a, priv_z, priv_funding_a, priv_funding_z) - router ! PeerRoutingMessage(peerConnection.ref, remoteNodeId, chan_az) + peerConnection.send(router, PeerRoutingMessage(remoteNodeId, chan_az)) watcher.expectMsg(ValidateRequest(chan_az)) watcher.send(router, ValidateResult(chan_az, Right(Transaction(version = 0, txIn = Nil, txOut = TxOut(1000000 sat, write(pay2wsh(Scripts.multiSig2of2(funding_a, priv_funding_z.publicKey)))) :: Nil, lockTime = 0), UtxoStatus.Spent(spendingTxConfirmed = true)))) peerConnection.expectMsg(TransportHandler.ReadAck(chan_az)) @@ -284,20 +284,20 @@ class RouterSpec extends BaseRouterSpec { test("handle bad signature for ChannelAnnouncement") { fixture => import fixture._ - val sender = TestProbe() + val peerConnection = TestProbe() val channelId_ac = ShortChannelId(420000, 5, 0) val chan_ac = channelAnnouncement(channelId_ac, priv_a, priv_c, priv_funding_a, priv_funding_c) val buggy_chan_ac = chan_ac.copy(nodeSignature1 = chan_ac.nodeSignature2) - sender.send(router, PeerRoutingMessage(sender.ref, remoteNodeId, buggy_chan_ac)) - sender.expectMsg(TransportHandler.ReadAck(buggy_chan_ac)) - sender.expectMsg(GossipDecision.InvalidSignature(buggy_chan_ac)) + peerConnection.send(router, PeerRoutingMessage(remoteNodeId, buggy_chan_ac)) + peerConnection.expectMsg(TransportHandler.ReadAck(buggy_chan_ac)) + peerConnection.expectMsg(GossipDecision.InvalidSignature(buggy_chan_ac)) } test("handle bad signature for NodeAnnouncement") { fixture => import fixture._ val peerConnection = TestProbe() val buggy_ann_a = node_a.copy(signature = node_b.signature, timestamp = node_a.timestamp + 1) - peerConnection.send(router, PeerRoutingMessage(peerConnection.ref, remoteNodeId, buggy_ann_a)) + peerConnection.send(router, PeerRoutingMessage(remoteNodeId, buggy_ann_a)) peerConnection.expectMsg(TransportHandler.ReadAck(buggy_ann_a)) peerConnection.expectMsg(GossipDecision.InvalidSignature(buggy_ann_a)) } @@ -306,7 +306,7 @@ class RouterSpec extends BaseRouterSpec { import fixture._ val peerConnection = TestProbe() val buggy_channelUpdate_ab = update_ab.copy(signature = node_b.signature, timestamp = update_ab.timestamp + 1) - peerConnection.send(router, PeerRoutingMessage(peerConnection.ref, remoteNodeId, buggy_channelUpdate_ab)) + peerConnection.send(router, PeerRoutingMessage(remoteNodeId, buggy_channelUpdate_ab)) peerConnection.expectMsg(TransportHandler.ReadAck(buggy_channelUpdate_ab)) peerConnection.expectMsg(GossipDecision.InvalidSignature(buggy_channelUpdate_ab)) } @@ -369,7 +369,7 @@ class RouterSpec extends BaseRouterSpec { assert(res.hops.last.nextNodeId === d) val channelUpdate_cd1 = makeChannelUpdate(Block.RegtestGenesisBlock.hash, priv_c, d, channelId_cd, CltvExpiryDelta(3), 0 msat, 153000 msat, 4, 500000000L msat, enable = false) - peerConnection.send(router, PeerRoutingMessage(peerConnection.ref, remoteNodeId, channelUpdate_cd1)) + peerConnection.send(router, PeerRoutingMessage(remoteNodeId, channelUpdate_cd1)) peerConnection.expectMsg(TransportHandler.ReadAck(channelUpdate_cd1)) sender.send(router, RouteRequest(a, d, DEFAULT_AMOUNT_MSAT, routeParams = relaxedRouteParams)) sender.expectMsg(Failure(RouteNotFound)) @@ -446,27 +446,28 @@ class RouterSpec extends BaseRouterSpec { val blockHeight = 400000 - 2020 val channelId = ShortChannelId(blockHeight, 5, 0) val announcement = channelAnnouncement(channelId, priv_a, priv_c, priv_funding_a, priv_funding_c) - val timestamp = (Platform.currentTime.milliseconds - 14.days - 1.day).toSeconds - val update = makeChannelUpdate(Block.RegtestGenesisBlock.hash, priv_a, c, channelId, CltvExpiryDelta(7), 0 msat, 766000 msat, 10, 5 msat, timestamp = timestamp) - val probe = TestProbe() - probe.ignoreMsg { case _: TransportHandler.ReadAck => true } - probe.send(router, PeerRoutingMessage(probe.ref, remoteNodeId, announcement)) + val oldTimestamp = (Platform.currentTime.milliseconds - 14.days - 1.day).toSeconds + val staleUpdate = makeChannelUpdate(Block.RegtestGenesisBlock.hash, priv_a, c, channelId, CltvExpiryDelta(7), 0 msat, 766000 msat, 10, 5 msat, timestamp = oldTimestamp) + val peerConnection = TestProbe() + peerConnection.ignoreMsg { case _: TransportHandler.ReadAck => true } + peerConnection.send(router, PeerRoutingMessage(remoteNodeId, announcement)) watcher.expectMsgType[ValidateRequest] - probe.send(router, PeerRoutingMessage(probe.ref, remoteNodeId, update)) + peerConnection.send(router, PeerRoutingMessage(remoteNodeId, staleUpdate)) watcher.send(router, ValidateResult(announcement, Right((Transaction(version = 0, txIn = Nil, txOut = TxOut(1000000 sat, write(pay2wsh(Scripts.multiSig2of2(funding_a, funding_c)))) :: Nil, lockTime = 0), UtxoStatus.Unspent)))) + peerConnection.expectMsg(GossipDecision.Accepted(announcement)) + peerConnection.expectMsg(GossipDecision.Stale(staleUpdate)) + val probe = TestProbe() probe.send(router, TickPruneStaleChannels) val sender = TestProbe() sender.send(router, GetRoutingState) sender.expectMsgType[RoutingState] - val update1 = makeChannelUpdate(Block.RegtestGenesisBlock.hash, priv_a, c, channelId, CltvExpiryDelta(7), 0 msat, 766000 msat, 10, 500000000L msat, timestamp = Platform.currentTime.millisecond.toSeconds) + val recentUpdate = makeChannelUpdate(Block.RegtestGenesisBlock.hash, priv_a, c, channelId, CltvExpiryDelta(7), 0 msat, 766000 msat, 10, 500000000L msat, timestamp = Platform.currentTime.millisecond.toSeconds) // we want to make sure that transport receives the query - val peerConnection = TestProbe() - probe.send(router, PeerRoutingMessage(peerConnection.ref, remoteNodeId, update1)) - peerConnection.expectMsg(TransportHandler.ReadAck(update1)) - peerConnection.expectMsg(GossipDecision.RelatedChannelPruned(update1)) + peerConnection.send(router, PeerRoutingMessage(remoteNodeId, recentUpdate)) + peerConnection.expectMsg(GossipDecision.RelatedChannelPruned(recentUpdate)) val query = peerConnection.expectMsgType[QueryShortChannelIds] assert(query.shortChannelIds.array == List(channelId)) } diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/router/RoutingSyncSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/router/RoutingSyncSpec.scala index 94d167b99..bef0bd95d 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/router/RoutingSyncSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/router/RoutingSyncSpec.scala @@ -27,7 +27,7 @@ import fr.acinq.eclair.crypto.TransportHandler import fr.acinq.eclair.io.Peer.PeerRoutingMessage import fr.acinq.eclair.router.Announcements.{makeChannelUpdate, makeNodeAnnouncement} import fr.acinq.eclair.router.BaseRouterSpec.channelAnnouncement -import fr.acinq.eclair.router.Router.{Data, GossipDecision, PublicChannel, SendChannelQuery, State} +import fr.acinq.eclair.router.Router._ import fr.acinq.eclair.router.Sync._ import fr.acinq.eclair.transactions.Scripts import fr.acinq.eclair.wire._ @@ -74,7 +74,6 @@ class RoutingSyncSpec extends TestKit(ActorSystem("test")) with FunSuiteLike wit } def sync(src: TestFSMRef[State, Data, Router], tgt: TestFSMRef[State, Data, Router], extendedQueryFlags_opt: Option[QueryChannelRangeTlv]): SyncResult = { - val sender = TestProbe() val pipe = TestProbe() pipe.ignoreMsg { case _: TransportHandler.ReadAck => true @@ -84,10 +83,10 @@ class RoutingSyncSpec extends TestKit(ActorSystem("test")) with FunSuiteLike wit } val srcId = src.underlyingActor.nodeParams.nodeId val tgtId = tgt.underlyingActor.nodeParams.nodeId - sender.send(src, SendChannelQuery(src.underlyingActor.nodeParams.chainHash, tgtId, pipe.ref, extendedQueryFlags_opt)) + pipe.send(src, SendChannelQuery(src.underlyingActor.nodeParams.chainHash, tgtId, extendedQueryFlags_opt)) // src sends a query_channel_range to bob val qcr = pipe.expectMsgType[QueryChannelRange] - pipe.send(tgt, PeerRoutingMessage(pipe.ref, srcId, qcr)) + pipe.send(tgt, PeerRoutingMessage(srcId, qcr)) // this allows us to know when the last reply_channel_range has been set pipe.send(tgt, 'data) // tgt answers with reply_channel_ranges @@ -95,7 +94,7 @@ class RoutingSyncSpec extends TestKit(ActorSystem("test")) with FunSuiteLike wit case rcr: ReplyChannelRange => rcr } pipe.expectMsgType[Data] - rcrs.foreach(rcr => pipe.send(src, PeerRoutingMessage(pipe.ref, tgtId, rcr))) + rcrs.foreach(rcr => pipe.send(src, PeerRoutingMessage(tgtId, rcr))) // then src will now query announcements var queries = Vector.empty[QueryShortChannelIds] var channels = Vector.empty[ChannelAnnouncement] @@ -104,7 +103,7 @@ class RoutingSyncSpec extends TestKit(ActorSystem("test")) with FunSuiteLike wit while (src.stateData.sync.nonEmpty) { // for each chunk, src sends a query_short_channel_id val query = pipe.expectMsgType[QueryShortChannelIds] - pipe.send(tgt, PeerRoutingMessage(pipe.ref, srcId, query)) + pipe.send(tgt, PeerRoutingMessage(srcId, query)) queries = queries :+ query val announcements = pipe.receiveWhile() { case c: ChannelAnnouncement => @@ -118,10 +117,10 @@ class RoutingSyncSpec extends TestKit(ActorSystem("test")) with FunSuiteLike wit n } // tgt replies with announcements - announcements.foreach(ann => pipe.send(src, PeerRoutingMessage(pipe.ref, tgtId, ann))) + announcements.foreach(ann => pipe.send(src, PeerRoutingMessage(tgtId, ann))) // and tgt ends this chunk with a reply_short_channel_id_end val rscie = pipe.expectMsgType[ReplyShortChannelIdsEnd] - pipe.send(src, PeerRoutingMessage(pipe.ref, tgtId, rscie)) + pipe.send(src, PeerRoutingMessage(tgtId, rscie)) } SyncResult(rcrs, queries, channels, updates, nodes) } @@ -146,11 +145,11 @@ class RoutingSyncSpec extends TestKit(ActorSystem("test")) with FunSuiteLike wit // add some channels and updates to bob and resync fakeRoutingInfo.take(10).values.foreach { case (pc, na1, na2) => - sender.send(bob, PeerRoutingMessage(sender.ref, charlieId, pc.ann)) - sender.send(bob, PeerRoutingMessage(sender.ref, charlieId, pc.update_1_opt.get)) + sender.send(bob, PeerRoutingMessage(charlieId, pc.ann)) + sender.send(bob, PeerRoutingMessage(charlieId, pc.update_1_opt.get)) // we don't send channel_update #2 - sender.send(bob, PeerRoutingMessage(sender.ref, charlieId, na1)) - sender.send(bob, PeerRoutingMessage(sender.ref, charlieId, na2)) + sender.send(bob, PeerRoutingMessage(charlieId, na1)) + sender.send(bob, PeerRoutingMessage(charlieId, na2)) } awaitCond(bob.stateData.channels.size === 10 && countUpdates(bob.stateData.channels) === 10) assert(BasicSyncResult(ranges = 1, queries = 2, channels = 10, updates = 10, nodes = 10 * 2) === sync(alice, bob, extendedQueryFlags_opt).counts) @@ -159,7 +158,7 @@ class RoutingSyncSpec extends TestKit(ActorSystem("test")) with FunSuiteLike wit // add some updates to bob and resync fakeRoutingInfo.take(10).values.foreach { case (pc, _, _) => - sender.send(bob, PeerRoutingMessage(sender.ref, charlieId, pc.update_2_opt.get)) + sender.send(bob, PeerRoutingMessage(charlieId, pc.update_2_opt.get)) } awaitCond(bob.stateData.channels.size === 10 && countUpdates(bob.stateData.channels) === 10 * 2) assert(BasicSyncResult(ranges = 1, queries = 2, channels = 10, updates = 10 * 2, nodes = 10 * 2) === sync(alice, bob, extendedQueryFlags_opt).counts) @@ -168,11 +167,11 @@ class RoutingSyncSpec extends TestKit(ActorSystem("test")) with FunSuiteLike wit // add everything (duplicates will be ignored) fakeRoutingInfo.values.foreach { case (pc, na1, na2) => - sender.send(bob, PeerRoutingMessage(sender.ref, charlieId, pc.ann)) - sender.send(bob, PeerRoutingMessage(sender.ref, charlieId, pc.update_1_opt.get)) - sender.send(bob, PeerRoutingMessage(sender.ref, charlieId, pc.update_2_opt.get)) - sender.send(bob, PeerRoutingMessage(sender.ref, charlieId, na1)) - sender.send(bob, PeerRoutingMessage(sender.ref, charlieId, na2)) + sender.send(bob, PeerRoutingMessage(charlieId, pc.ann)) + sender.send(bob, PeerRoutingMessage(charlieId, pc.update_1_opt.get)) + sender.send(bob, PeerRoutingMessage(charlieId, pc.update_2_opt.get)) + sender.send(bob, PeerRoutingMessage(charlieId, na1)) + sender.send(bob, PeerRoutingMessage(charlieId, na2)) } awaitCond(bob.stateData.channels.size === fakeRoutingInfo.size && countUpdates(bob.stateData.channels) === 2 * fakeRoutingInfo.size, max = 60 seconds) assert(BasicSyncResult(ranges = 3, queries = 13, channels = fakeRoutingInfo.size, updates = 2 * fakeRoutingInfo.size, nodes = 2 * fakeRoutingInfo.size) === sync(alice, bob, extendedQueryFlags_opt).counts) @@ -194,11 +193,11 @@ class RoutingSyncSpec extends TestKit(ActorSystem("test")) with FunSuiteLike wit // add some channels and updates to bob and resync fakeRoutingInfo.take(10).values.foreach { case (pc, na1, na2) => - sender.send(bob, PeerRoutingMessage(sender.ref, charlieId, pc.ann)) - sender.send(bob, PeerRoutingMessage(sender.ref, charlieId, pc.update_1_opt.get)) + sender.send(bob, PeerRoutingMessage(charlieId, pc.ann)) + sender.send(bob, PeerRoutingMessage(charlieId, pc.update_1_opt.get)) // we don't send channel_update #2 - sender.send(bob, PeerRoutingMessage(sender.ref, charlieId, na1)) - sender.send(bob, PeerRoutingMessage(sender.ref, charlieId, na2)) + sender.send(bob, PeerRoutingMessage(charlieId, na1)) + sender.send(bob, PeerRoutingMessage(charlieId, na2)) } awaitCond(bob.stateData.channels.size === 10 && countUpdates(bob.stateData.channels) === 10) assert(BasicSyncResult(ranges = 1, queries = 2, channels = 10, updates = 10, nodes = if (requestNodeAnnouncements) 10 * 2 else 0) === sync(alice, bob, extendedQueryFlags_opt).counts) @@ -208,7 +207,7 @@ class RoutingSyncSpec extends TestKit(ActorSystem("test")) with FunSuiteLike wit // add some updates to bob and resync fakeRoutingInfo.take(10).values.foreach { case (pc, _, _) => - sender.send(bob, PeerRoutingMessage(sender.ref, charlieId, pc.update_2_opt.get)) + sender.send(bob, PeerRoutingMessage(charlieId, pc.update_2_opt.get)) } awaitCond(bob.stateData.channels.size === 10 && countUpdates(bob.stateData.channels) === 10 * 2) assert(BasicSyncResult(ranges = 1, queries = 2, channels = 0, updates = 10, nodes = if (requestNodeAnnouncements) 10 * 2 else 0) === sync(alice, bob, extendedQueryFlags_opt).counts) @@ -217,11 +216,11 @@ class RoutingSyncSpec extends TestKit(ActorSystem("test")) with FunSuiteLike wit // add everything (duplicates will be ignored) fakeRoutingInfo.values.foreach { case (pc, na1, na2) => - sender.send(bob, PeerRoutingMessage(sender.ref, charlieId, pc.ann)) - sender.send(bob, PeerRoutingMessage(sender.ref, charlieId, pc.update_1_opt.get)) - sender.send(bob, PeerRoutingMessage(sender.ref, charlieId, pc.update_2_opt.get)) - sender.send(bob, PeerRoutingMessage(sender.ref, charlieId, na1)) - sender.send(bob, PeerRoutingMessage(sender.ref, charlieId, na2)) + sender.send(bob, PeerRoutingMessage(charlieId, pc.ann)) + sender.send(bob, PeerRoutingMessage(charlieId, pc.update_1_opt.get)) + sender.send(bob, PeerRoutingMessage(charlieId, pc.update_2_opt.get)) + sender.send(bob, PeerRoutingMessage(charlieId, na1)) + sender.send(bob, PeerRoutingMessage(charlieId, na2)) } awaitCond(bob.stateData.channels.size === fakeRoutingInfo.size && countUpdates(bob.stateData.channels) === 2 * fakeRoutingInfo.size, max = 60 seconds) assert(BasicSyncResult(ranges = 3, queries = 11, channels = fakeRoutingInfo.size - 10, updates = 2 * (fakeRoutingInfo.size - 10), nodes = if (requestNodeAnnouncements) 2 * (fakeRoutingInfo.size - 10) else 0) === sync(alice, bob, extendedQueryFlags_opt).counts) @@ -234,7 +233,7 @@ class RoutingSyncSpec extends TestKit(ActorSystem("test")) with FunSuiteLike wit } val bumpedUpdates = (List(0, 3, 7).map(touchUpdate(_, true)) ++ List(1, 3, 9).map(touchUpdate(_, false))).toSet - bumpedUpdates.foreach(c => sender.send(bob, PeerRoutingMessage(sender.ref, charlieId, c))) + bumpedUpdates.foreach(c => sender.send(bob, PeerRoutingMessage(charlieId, c))) assert(BasicSyncResult(ranges = 3, queries = 1, channels = 0, updates = bumpedUpdates.size, nodes = if (requestNodeAnnouncements) 5 * 2 else 0) === sync(alice, bob, extendedQueryFlags_opt).counts) awaitCond(alice.stateData.channels === bob.stateData.channels, max = 60 seconds) if (requestNodeAnnouncements) awaitCond(alice.stateData.nodes === bob.stateData.nodes) @@ -251,29 +250,30 @@ class RoutingSyncSpec extends TestKit(ActorSystem("test")) with FunSuiteLike wit test("reset sync state on reconnection") { val params = TestConstants.Alice.nodeParams val router = TestFSMRef(new Router(params, TestProbe().ref)) - val transport = TestProbe() + val peerConnection = TestProbe() + peerConnection.ignoreMsg { case _: TransportHandler.ReadAck => true } val sender = TestProbe() sender.ignoreMsg { case _: TransportHandler.ReadAck => true } val remoteNodeId = TestConstants.Bob.nodeParams.nodeId // ask router to send a channel range query - sender.send(router, SendChannelQuery(params.chainHash, remoteNodeId, sender.ref, None)) + sender.send(router, SendChannelQuery(params.chainHash, remoteNodeId, None)) val QueryChannelRange(chainHash, firstBlockNum, numberOfBlocks, _) = sender.expectMsgType[QueryChannelRange] sender.expectMsgType[GossipTimestampFilter] val block1 = ReplyChannelRange(chainHash, firstBlockNum, numberOfBlocks, 1, EncodedShortChannelIds(EncodingType.UNCOMPRESSED, fakeRoutingInfo.take(params.routerConf.channelQueryChunkSize).keys.toList), None, None) // send first block - sender.send(router, PeerRoutingMessage(transport.ref, remoteNodeId, block1)) + peerConnection.send(router, PeerRoutingMessage(remoteNodeId, block1)) // router should ask for our first block of ids - assert(transport.expectMsgType[QueryShortChannelIds] === QueryShortChannelIds(chainHash, block1.shortChannelIds, TlvStream.empty)) + assert(peerConnection.expectMsgType[QueryShortChannelIds] === QueryShortChannelIds(chainHash, block1.shortChannelIds, TlvStream.empty)) // router should think that it is missing 100 channels, in one request val Some(sync) = router.stateData.sync.get(remoteNodeId) assert(sync.total == 1) // simulate a re-connection - sender.send(router, SendChannelQuery(params.chainHash, remoteNodeId, sender.ref, None)) + sender.send(router, SendChannelQuery(params.chainHash, remoteNodeId, None)) sender.expectMsgType[QueryChannelRange] sender.expectMsgType[GossipTimestampFilter] assert(router.stateData.sync.get(remoteNodeId).isEmpty)