mirror of
https://github.com/ACINQ/eclair.git
synced 2025-02-23 14:40:34 +01:00
Use sender instead of providing actor refs (#1379)
In some messages, we provide a reference to the sending actor byt adding `self` in the message, but we could use `sender` instead.
This commit is contained in:
parent
9103b99077
commit
86ee7eaabb
13 changed files with 144 additions and 140 deletions
|
@ -121,7 +121,7 @@ object Logs {
|
|||
case _: PeerConnection.PendingAuth => Some(LogCategory.CONNECTION)
|
||||
case _: PeerConnection.Authenticated => Some(LogCategory.CONNECTION)
|
||||
case _: PeerConnection.ConnectionReady => Some(LogCategory.CONNECTION)
|
||||
case _: PeerConnection.InitializeConnection => Some(LogCategory.CONNECTION)
|
||||
case PeerConnection.InitializeConnection => Some(LogCategory.CONNECTION)
|
||||
case _: PeerConnection.DelayedRebroadcast => Some(LogCategory.ROUTING_SYNC)
|
||||
case _: Ping => Some(LogCategory.CONNECTION)
|
||||
case _: Pong => Some(LogCategory.CONNECTION)
|
||||
|
|
|
@ -71,7 +71,8 @@ class Peer(val nodeParams: NodeParams, remoteNodeId: PublicKey, switchboard: Act
|
|||
reconnectionTask forward p
|
||||
stay
|
||||
|
||||
case Event(PeerConnection.ConnectionReady(peerConnection, remoteNodeId1, address, outgoing, localInit, remoteInit), d: DisconnectedData) =>
|
||||
case Event(PeerConnection.ConnectionReady(remoteNodeId1, address, outgoing, localInit, remoteInit), d: DisconnectedData) =>
|
||||
val peerConnection = sender
|
||||
require(remoteNodeId == remoteNodeId1, s"invalid nodeid: $remoteNodeId != $remoteNodeId1")
|
||||
log.debug("got authenticated connection to address {}:{}", address.getHostString, address.getPort)
|
||||
|
||||
|
@ -370,7 +371,7 @@ object Peer {
|
|||
case object GetPeerInfo
|
||||
case class PeerInfo(nodeId: PublicKey, state: String, address: Option[InetSocketAddress], channels: Int)
|
||||
|
||||
case class PeerRoutingMessage(peerConnection: ActorRef, remoteNodeId: PublicKey, message: LightningMessage)
|
||||
case class PeerRoutingMessage(remoteNodeId: PublicKey, message: RoutingMessage)
|
||||
|
||||
// @formatter:on
|
||||
|
||||
|
|
|
@ -87,7 +87,7 @@ class PeerConnection(nodeParams: NodeParams, switchboard: ActorRef, router: Acto
|
|||
import d.pendingAuth.address
|
||||
log.info(s"connection authenticated with $remoteNodeId@${address.getHostString}:${address.getPort} direction=${if (d.pendingAuth.outgoing) "outgoing" else "incoming"}")
|
||||
Metrics.PeerConnectionsConnecting.withTag(Tags.ConnectionState, Tags.ConnectionStates.Authenticated).increment()
|
||||
switchboard ! Authenticated(self, remoteNodeId)
|
||||
switchboard ! Authenticated(remoteNodeId)
|
||||
goto(BEFORE_INIT) using BeforeInitData(remoteNodeId, d.pendingAuth, d.transport)
|
||||
|
||||
case Event(AuthTimeout, _) =>
|
||||
|
@ -96,8 +96,9 @@ class PeerConnection(nodeParams: NodeParams, switchboard: ActorRef, router: Acto
|
|||
}
|
||||
|
||||
when(BEFORE_INIT) {
|
||||
case Event(InitializeConnection(peer), d: BeforeInitData) =>
|
||||
case Event(InitializeConnection, d: BeforeInitData) =>
|
||||
d.transport ! TransportHandler.Listener(self)
|
||||
val peer = sender
|
||||
Metrics.PeerConnectionsConnecting.withTag(Tags.ConnectionState, Tags.ConnectionStates.Initializing).increment()
|
||||
val localFeatures = nodeParams.overrideFeatures.get(d.remoteNodeId) match {
|
||||
case Some(f) => f
|
||||
|
@ -144,7 +145,7 @@ class PeerConnection(nodeParams: NodeParams, switchboard: ActorRef, router: Acto
|
|||
stay
|
||||
} else {
|
||||
Metrics.PeerConnectionsConnecting.withTag(Tags.ConnectionState, Tags.ConnectionStates.Initialized).increment()
|
||||
d.peer ! ConnectionReady(self, d.remoteNodeId, d.pendingAuth.address, d.pendingAuth.outgoing, d.localInit, remoteInit)
|
||||
d.peer ! ConnectionReady(d.remoteNodeId, d.pendingAuth.address, d.pendingAuth.outgoing, d.localInit, remoteInit)
|
||||
|
||||
d.pendingAuth.origin_opt.foreach(origin => origin ! "connected")
|
||||
|
||||
|
@ -160,7 +161,7 @@ class PeerConnection(nodeParams: NodeParams, switchboard: ActorRef, router: Acto
|
|||
val flags_opt = if (canUseChannelRangeQueriesEx) Some(QueryChannelRangeTlv.QueryFlags(QueryChannelRangeTlv.QueryFlags.WANT_ALL)) else None
|
||||
if (d.nodeParams.syncWhitelist.isEmpty || d.nodeParams.syncWhitelist.contains(d.remoteNodeId)) {
|
||||
log.info(s"sending sync channel range query with flags_opt=$flags_opt")
|
||||
router ! SendChannelQuery(nodeParams.chainHash, d.remoteNodeId, self, flags_opt = flags_opt)
|
||||
router ! SendChannelQuery(nodeParams.chainHash, d.remoteNodeId, flags_opt = flags_opt)
|
||||
} else {
|
||||
log.info("not syncing with this peer")
|
||||
}
|
||||
|
@ -307,7 +308,7 @@ class PeerConnection(nodeParams: NodeParams, switchboard: ActorRef, router: Acto
|
|||
d.transport ! TransportHandler.ReadAck(msg)
|
||||
case _ =>
|
||||
// Note: we don't ack messages here because we don't want them to be stacked in the router's mailbox
|
||||
router ! Peer.PeerRoutingMessage(self, d.remoteNodeId, msg)
|
||||
router ! Peer.PeerRoutingMessage(d.remoteNodeId, msg)
|
||||
}
|
||||
stay
|
||||
|
||||
|
@ -496,9 +497,9 @@ object PeerConnection {
|
|||
case class PendingAuth(connection: ActorRef, remoteNodeId_opt: Option[PublicKey], address: InetSocketAddress, origin_opt: Option[ActorRef], transport_opt: Option[ActorRef] = None) {
|
||||
def outgoing: Boolean = remoteNodeId_opt.isDefined // if this is an outgoing connection, we know the node id in advance
|
||||
}
|
||||
case class Authenticated(peerConnection: ActorRef, remoteNodeId: PublicKey)
|
||||
case class InitializeConnection(peer: ActorRef)
|
||||
case class ConnectionReady(peerConnection: ActorRef, remoteNodeId: PublicKey, address: InetSocketAddress, outgoing: Boolean, localInit: wire.Init, remoteInit: wire.Init)
|
||||
case class Authenticated(remoteNodeId: PublicKey)
|
||||
case object InitializeConnection
|
||||
case class ConnectionReady(remoteNodeId: PublicKey, address: InetSocketAddress, outgoing: Boolean, localInit: wire.Init, remoteInit: wire.Init)
|
||||
|
||||
case class DelayedRebroadcast(rebroadcast: Rebroadcast)
|
||||
|
||||
|
|
|
@ -72,7 +72,7 @@ class Switchboard(nodeParams: NodeParams, router: ActorRef, watcher: ActorRef, r
|
|||
case authenticated: PeerConnection.Authenticated =>
|
||||
// if this is an incoming connection, we might not yet have created the peer
|
||||
val peer = createOrGetPeer(authenticated.remoteNodeId, offlineChannels = Set.empty)
|
||||
authenticated.peerConnection ! PeerConnection.InitializeConnection(peer)
|
||||
sender.tell(PeerConnection.InitializeConnection, peer) // switchboard impersonates the peer, which will appear as the sender
|
||||
|
||||
case 'peers => sender ! context.children
|
||||
|
||||
|
|
|
@ -184,13 +184,13 @@ class Router(val nodeParams: NodeParams, watcher: ActorRef, initialized: Option[
|
|||
stay using RouteCalculation.handleRouteRequest(d, nodeParams.routerConf, nodeParams.currentBlockHeight, r)
|
||||
|
||||
// Warning: order matters here, this must be the first match for HasChainHash messages !
|
||||
case Event(PeerRoutingMessage(_, _, routingMessage: HasChainHash), _) if routingMessage.chainHash != nodeParams.chainHash =>
|
||||
case Event(PeerRoutingMessage(_, routingMessage: HasChainHash), _) if routingMessage.chainHash != nodeParams.chainHash =>
|
||||
sender ! TransportHandler.ReadAck(routingMessage)
|
||||
log.warning("message {} for wrong chain {}, we're on {}", routingMessage, routingMessage.chainHash, nodeParams.chainHash)
|
||||
stay
|
||||
|
||||
case Event(PeerRoutingMessage(peerConnection, remoteNodeId, c: ChannelAnnouncement), d) =>
|
||||
stay using Validation.handleChannelAnnouncement(d, nodeParams.db.network, watcher, RemoteGossip(peerConnection, remoteNodeId), c)
|
||||
case Event(PeerRoutingMessage(remoteNodeId, c: ChannelAnnouncement), d) =>
|
||||
stay using Validation.handleChannelAnnouncement(d, nodeParams.db.network, watcher, RemoteGossip(sender, remoteNodeId), c)
|
||||
|
||||
case Event(r: ValidateResult, d) =>
|
||||
stay using Validation.handleChannelValidationResponse(d, nodeParams, watcher, r)
|
||||
|
@ -201,14 +201,14 @@ class Router(val nodeParams: NodeParams, watcher: ActorRef, initialized: Option[
|
|||
case Event(n: NodeAnnouncement, d: Data) =>
|
||||
stay using Validation.handleNodeAnnouncement(d, nodeParams.db.network, Set(LocalGossip), n)
|
||||
|
||||
case Event(PeerRoutingMessage(peerConnection, remoteNodeId, n: NodeAnnouncement), d: Data) =>
|
||||
stay using Validation.handleNodeAnnouncement(d, nodeParams.db.network, Set(RemoteGossip(peerConnection, remoteNodeId)), n)
|
||||
case Event(PeerRoutingMessage(remoteNodeId, n: NodeAnnouncement), d: Data) =>
|
||||
stay using Validation.handleNodeAnnouncement(d, nodeParams.db.network, Set(RemoteGossip(sender, remoteNodeId)), n)
|
||||
|
||||
case Event(u: ChannelUpdate, d: Data) =>
|
||||
stay using Validation.handleChannelUpdate(d, nodeParams.db.network, nodeParams.routerConf, Set(LocalGossip), u)
|
||||
|
||||
case Event(PeerRoutingMessage(peerConnection, remoteNodeId, u: ChannelUpdate), d) =>
|
||||
stay using Validation.handleChannelUpdate(d, nodeParams.db.network, nodeParams.routerConf, Set(RemoteGossip(peerConnection, remoteNodeId)), u)
|
||||
case Event(PeerRoutingMessage(remoteNodeId, u: ChannelUpdate), d) =>
|
||||
stay using Validation.handleChannelUpdate(d, nodeParams.db.network, nodeParams.routerConf, Set(RemoteGossip(sender, remoteNodeId)), u)
|
||||
|
||||
case Event(lcu: LocalChannelUpdate, d: Data) =>
|
||||
stay using Validation.handleLocalChannelUpdate(d, nodeParams.db.network, nodeParams.routerConf, nodeParams.nodeId, watcher, lcu)
|
||||
|
@ -219,19 +219,19 @@ class Router(val nodeParams: NodeParams, watcher: ActorRef, initialized: Option[
|
|||
case Event(s: SendChannelQuery, d) =>
|
||||
stay using Sync.handleSendChannelQuery(d, s)
|
||||
|
||||
case Event(PeerRoutingMessage(peerConnection, remoteNodeId, q: QueryChannelRange), d) =>
|
||||
Sync.handleQueryChannelRange(d.channels, nodeParams.routerConf, RemoteGossip(peerConnection, remoteNodeId), q)
|
||||
case Event(PeerRoutingMessage(remoteNodeId, q: QueryChannelRange), d) =>
|
||||
Sync.handleQueryChannelRange(d.channels, nodeParams.routerConf, RemoteGossip(sender, remoteNodeId), q)
|
||||
stay
|
||||
|
||||
case Event(PeerRoutingMessage(peerConnection, remoteNodeId, r: ReplyChannelRange), d) =>
|
||||
stay using Sync.handleReplyChannelRange(d, nodeParams.routerConf, RemoteGossip(peerConnection, remoteNodeId), r)
|
||||
case Event(PeerRoutingMessage(remoteNodeId, r: ReplyChannelRange), d) =>
|
||||
stay using Sync.handleReplyChannelRange(d, nodeParams.routerConf, RemoteGossip(sender, remoteNodeId), r)
|
||||
|
||||
case Event(PeerRoutingMessage(peerConnection, remoteNodeId, q: QueryShortChannelIds), d) =>
|
||||
Sync.handleQueryShortChannelIds(d.nodes, d.channels, RemoteGossip(peerConnection, remoteNodeId), q)
|
||||
case Event(PeerRoutingMessage(remoteNodeId, q: QueryShortChannelIds), d) =>
|
||||
Sync.handleQueryShortChannelIds(d.nodes, d.channels, RemoteGossip(sender, remoteNodeId), q)
|
||||
stay
|
||||
|
||||
case Event(PeerRoutingMessage(peerConnection, remoteNodeId, r: ReplyShortChannelIdsEnd), d) =>
|
||||
stay using Sync.handleReplyShortChannelIdsEnd(d, RemoteGossip(peerConnection, remoteNodeId), r)
|
||||
case Event(PeerRoutingMessage(remoteNodeId, r: ReplyShortChannelIdsEnd), d) =>
|
||||
stay using Sync.handleReplyShortChannelIdsEnd(d, RemoteGossip(sender, remoteNodeId), r)
|
||||
|
||||
}
|
||||
|
||||
|
@ -364,7 +364,7 @@ object Router {
|
|||
// @formatter:on
|
||||
|
||||
// @formatter:off
|
||||
case class SendChannelQuery(chainHash: ByteVector32, remoteNodeId: PublicKey, to: ActorRef, flags_opt: Option[QueryChannelRangeTlv])
|
||||
case class SendChannelQuery(chainHash: ByteVector32, remoteNodeId: PublicKey, flags_opt: Option[QueryChannelRangeTlv])
|
||||
case object GetNetworkStats
|
||||
case class GetNetworkStatsResponse(stats: Option[NetworkStats])
|
||||
case object GetRoutingState
|
||||
|
|
|
@ -52,14 +52,14 @@ object Sync {
|
|||
// have to worry about sending a new query_channel_range when another query is still in progress
|
||||
val query = QueryChannelRange(s.chainHash, firstBlockNum = 0L, numberOfBlocks = Int.MaxValue.toLong, TlvStream(s.flags_opt.toList))
|
||||
log.info("sending query_channel_range={}", query)
|
||||
s.to ! query
|
||||
ctx.sender ! query
|
||||
|
||||
// we also set a pass-all filter for now (we can update it later) for the future gossip messages, by setting
|
||||
// the first_timestamp field to the current date/time and timestamp_range to the maximum value
|
||||
// NB: we can't just set firstTimestamp to 0, because in that case peer would send us all past messages matching
|
||||
// that (i.e. the whole routing table)
|
||||
val filter = GossipTimestampFilter(s.chainHash, firstTimestamp = Platform.currentTime.milliseconds.toSeconds, timestampRange = Int.MaxValue)
|
||||
s.to ! filter
|
||||
ctx.sender ! filter
|
||||
|
||||
// clean our sync state for this peer: we receive a SendChannelQuery just when we connect/reconnect to a peer and
|
||||
// will start a new complete sync process
|
||||
|
|
|
@ -1259,7 +1259,7 @@ class IntegrationSpec extends TestKit(ActorSystem("test")) with BitcoindService
|
|||
// then we make the announcements
|
||||
val announcements = channels.map(c => AnnouncementsBatchValidationSpec.makeChannelAnnouncement(c))
|
||||
announcements.foreach { ann =>
|
||||
nodes("A").router ! PeerRoutingMessage(sender.ref, remoteNodeId, ann)
|
||||
sender.send(nodes("A").router, PeerRoutingMessage(remoteNodeId, ann))
|
||||
sender.expectMsg(TransportHandler.ReadAck(ann))
|
||||
sender.expectMsg(GossipDecision.Accepted(ann))
|
||||
}
|
||||
|
|
|
@ -73,8 +73,9 @@ class PeerConnectionSpec extends TestkitBaseClass with StateTestsHelperMethods {
|
|||
val probe = TestProbe()
|
||||
probe.send(peerConnection, PeerConnection.PendingAuth(connection.ref, Some(remoteNodeId), address, origin_opt = None, transport_opt = Some(transport.ref)))
|
||||
transport.send(peerConnection, TransportHandler.HandshakeCompleted(remoteNodeId))
|
||||
switchboard.expectMsg(PeerConnection.Authenticated(peerConnection, remoteNodeId))
|
||||
probe.send(peerConnection, PeerConnection.InitializeConnection(peer.ref))
|
||||
switchboard.expectMsg(PeerConnection.Authenticated(remoteNodeId))
|
||||
assert(switchboard.sender() == peerConnection)
|
||||
peer.send(peerConnection, PeerConnection.InitializeConnection)
|
||||
transport.expectMsgType[TransportHandler.Listener]
|
||||
val localInit = transport.expectMsgType[wire.Init]
|
||||
assert(localInit.networks === List(Block.RegtestGenesisBlock.hash))
|
||||
|
@ -85,7 +86,8 @@ class PeerConnectionSpec extends TestkitBaseClass with StateTestsHelperMethods {
|
|||
} else {
|
||||
router.expectNoMsg(1 second)
|
||||
}
|
||||
peer.expectMsg(PeerConnection.ConnectionReady(peerConnection, remoteNodeId, address, outgoing = true, localInit, remoteInit))
|
||||
peer.expectMsg(PeerConnection.ConnectionReady(remoteNodeId, address, outgoing = true, localInit, remoteInit))
|
||||
assert(peer.sender() == peerConnection)
|
||||
assert(peerConnection.stateName === PeerConnection.CONNECTED)
|
||||
}
|
||||
|
||||
|
@ -117,7 +119,7 @@ class PeerConnectionSpec extends TestkitBaseClass with StateTestsHelperMethods {
|
|||
probe.watch(peerConnection)
|
||||
probe.send(peerConnection, PeerConnection.PendingAuth(connection.ref, Some(remoteNodeId), address, origin_opt = None, transport_opt = Some(transport.ref)))
|
||||
transport.send(peerConnection, TransportHandler.HandshakeCompleted(remoteNodeId))
|
||||
probe.send(peerConnection, PeerConnection.InitializeConnection(peer.ref))
|
||||
probe.send(peerConnection, PeerConnection.InitializeConnection)
|
||||
probe.expectTerminated(peerConnection, nodeParams.initTimeout / transport.testKitSettings.TestTimeFactor + 1.second) // we don't want dilated time here
|
||||
}
|
||||
|
||||
|
@ -127,7 +129,7 @@ class PeerConnectionSpec extends TestkitBaseClass with StateTestsHelperMethods {
|
|||
probe.watch(transport.ref)
|
||||
probe.send(peerConnection, PeerConnection.PendingAuth(connection.ref, Some(remoteNodeId), address, origin_opt = None, transport_opt = Some(transport.ref)))
|
||||
transport.send(peerConnection, TransportHandler.HandshakeCompleted(remoteNodeId))
|
||||
probe.send(peerConnection, PeerConnection.InitializeConnection(peer.ref))
|
||||
peer.send(peerConnection, PeerConnection.InitializeConnection)
|
||||
transport.expectMsgType[TransportHandler.Listener]
|
||||
transport.expectMsgType[wire.Init]
|
||||
transport.send(peerConnection, LightningMessageCodecs.initCodec.decode(hex"0000 00050100000000".bits).require.value)
|
||||
|
@ -141,7 +143,7 @@ class PeerConnectionSpec extends TestkitBaseClass with StateTestsHelperMethods {
|
|||
probe.watch(transport.ref)
|
||||
probe.send(peerConnection, PeerConnection.PendingAuth(connection.ref, Some(remoteNodeId), address, origin_opt = None, transport_opt = Some(transport.ref)))
|
||||
transport.send(peerConnection, TransportHandler.HandshakeCompleted(remoteNodeId))
|
||||
probe.send(peerConnection, PeerConnection.InitializeConnection(peer.ref))
|
||||
peer.send(peerConnection, PeerConnection.InitializeConnection)
|
||||
transport.expectMsgType[TransportHandler.Listener]
|
||||
transport.expectMsgType[wire.Init]
|
||||
transport.send(peerConnection, LightningMessageCodecs.initCodec.decode(hex"00050100000000 0000".bits).require.value)
|
||||
|
@ -167,7 +169,7 @@ class PeerConnectionSpec extends TestkitBaseClass with StateTestsHelperMethods {
|
|||
val peerConnection = TestFSMRef(new PeerConnection(nodeParams, switchboard.ref, router.ref))
|
||||
probe.send(peerConnection, PeerConnection.PendingAuth(connection.ref, Some(remoteNodeId), address, origin_opt = None, transport_opt = Some(transport.ref)))
|
||||
transport.send(peerConnection, TransportHandler.HandshakeCompleted(remoteNodeId))
|
||||
probe.send(peerConnection, PeerConnection.InitializeConnection(peer.ref))
|
||||
peer.send(peerConnection, PeerConnection.InitializeConnection)
|
||||
transport.expectMsgType[TransportHandler.Listener]
|
||||
val init = transport.expectMsgType[wire.Init]
|
||||
assert(init.features === sentFeatures.bytes)
|
||||
|
@ -180,7 +182,7 @@ class PeerConnectionSpec extends TestkitBaseClass with StateTestsHelperMethods {
|
|||
probe.watch(transport.ref)
|
||||
probe.send(peerConnection, PeerConnection.PendingAuth(connection.ref, Some(remoteNodeId), address, origin_opt = None, transport_opt = Some(transport.ref)))
|
||||
transport.send(peerConnection, TransportHandler.HandshakeCompleted(remoteNodeId))
|
||||
probe.send(peerConnection, PeerConnection.InitializeConnection(peer.ref))
|
||||
peer.send(peerConnection, PeerConnection.InitializeConnection)
|
||||
transport.expectMsgType[TransportHandler.Listener]
|
||||
transport.expectMsgType[wire.Init]
|
||||
transport.send(peerConnection, wire.Init(Bob.nodeParams.features, TlvStream(InitTlv.Networks(Block.LivenetGenesisBlock.hash :: Block.SegnetGenesisBlock.hash :: Nil))))
|
||||
|
@ -335,7 +337,7 @@ class PeerConnectionSpec extends TestkitBaseClass with StateTestsHelperMethods {
|
|||
// make sure that routing messages go through
|
||||
for (ann <- channels ++ updates) {
|
||||
transport.send(peerConnection, ann)
|
||||
router.expectMsg(Peer.PeerRoutingMessage(peerConnection, remoteNodeId, ann))
|
||||
router.expectMsg(Peer.PeerRoutingMessage(remoteNodeId, ann))
|
||||
}
|
||||
transport.expectNoMsg(1 second) // peer hasn't acknowledged the messages
|
||||
|
||||
|
@ -351,7 +353,7 @@ class PeerConnectionSpec extends TestkitBaseClass with StateTestsHelperMethods {
|
|||
router.expectNoMsg(1 second)
|
||||
// other routing messages go through
|
||||
transport.send(peerConnection, query)
|
||||
router.expectMsg(Peer.PeerRoutingMessage(peerConnection, remoteNodeId, query))
|
||||
router.expectMsg(Peer.PeerRoutingMessage(remoteNodeId, query))
|
||||
|
||||
// after a while the ban is lifted
|
||||
probe.send(peerConnection, PeerConnection.ResumeAnnouncements)
|
||||
|
@ -359,7 +361,7 @@ class PeerConnectionSpec extends TestkitBaseClass with StateTestsHelperMethods {
|
|||
// and announcements are processed again
|
||||
for (ann <- channels ++ updates) {
|
||||
transport.send(peerConnection, ann)
|
||||
router.expectMsg(Peer.PeerRoutingMessage(peerConnection, remoteNodeId, ann))
|
||||
router.expectMsg(Peer.PeerRoutingMessage(remoteNodeId, ann))
|
||||
}
|
||||
transport.expectNoMsg(1 second) // peer hasn't acknowledged the messages
|
||||
|
||||
|
|
|
@ -71,7 +71,7 @@ class PeerSpec extends TestkitBaseClass with StateTestsHelperMethods {
|
|||
// let's simulate a connection
|
||||
switchboard.send(peer, Peer.Init(channels))
|
||||
val localInit = wire.Init(peer.underlyingActor.nodeParams.features)
|
||||
switchboard.send(peer, PeerConnection.ConnectionReady(peerConnection.ref, remoteNodeId, fakeIPAddress.socketAddress, outgoing = true, localInit, remoteInit))
|
||||
peerConnection.send(peer, PeerConnection.ConnectionReady(remoteNodeId, fakeIPAddress.socketAddress, outgoing = true, localInit, remoteInit))
|
||||
val probe = TestProbe()
|
||||
probe.send(peer, Peer.GetPeerInfo)
|
||||
assert(probe.expectMsgType[Peer.PeerInfo].state == "CONNECTED")
|
||||
|
|
|
@ -495,11 +495,10 @@ class PaymentLifecycleSpec extends BaseRouterSpec {
|
|||
val channelUpdate_bg = makeChannelUpdate(Block.RegtestGenesisBlock.hash, priv_b, g, channelId_bg, CltvExpiryDelta(9), htlcMinimumMsat = 0 msat, feeBaseMsat = 0 msat, feeProportionalMillionths = 0, htlcMaximumMsat = 500000000 msat)
|
||||
val channelUpdate_gb = makeChannelUpdate(Block.RegtestGenesisBlock.hash, priv_g, b, channelId_bg, CltvExpiryDelta(9), htlcMinimumMsat = 0 msat, feeBaseMsat = 10 msat, feeProportionalMillionths = 8, htlcMaximumMsat = 500000000 msat)
|
||||
assert(Router.getDesc(channelUpdate_bg, chan_bg) === ChannelDesc(chan_bg.shortChannelId, priv_b.publicKey, priv_g.publicKey))
|
||||
val peerConnection = TestProbe()
|
||||
router ! PeerRoutingMessage(peerConnection.ref, remoteNodeId, chan_bg)
|
||||
router ! PeerRoutingMessage(peerConnection.ref, remoteNodeId, ann_g)
|
||||
router ! PeerRoutingMessage(peerConnection.ref, remoteNodeId, channelUpdate_bg)
|
||||
router ! PeerRoutingMessage(peerConnection.ref, remoteNodeId, channelUpdate_gb)
|
||||
router ! PeerRoutingMessage(remoteNodeId, chan_bg)
|
||||
router ! PeerRoutingMessage(remoteNodeId, ann_g)
|
||||
router ! PeerRoutingMessage(remoteNodeId, channelUpdate_bg)
|
||||
router ! PeerRoutingMessage(remoteNodeId, channelUpdate_gb)
|
||||
watcher.expectMsg(ValidateRequest(chan_bg))
|
||||
watcher.send(router, ValidateResult(chan_bg, Right((Transaction(version = 0, txIn = Nil, txOut = TxOut(1000000 sat, write(pay2wsh(Scripts.multiSig2of2(funding_b, funding_g)))) :: Nil, lockTime = 0), UtxoStatus.Unspent))))
|
||||
watcher.expectMsgType[WatchSpentBasic]
|
||||
|
|
|
@ -106,26 +106,26 @@ abstract class BaseRouterSpec extends TestkitBaseClass {
|
|||
val nodeParams = Alice.nodeParams
|
||||
val router = system.actorOf(Router.props(nodeParams, watcher.ref))
|
||||
// we announce channels
|
||||
peerConnection.send(router, PeerRoutingMessage(peerConnection.ref, remoteNodeId, chan_ab))
|
||||
peerConnection.send(router, PeerRoutingMessage(peerConnection.ref, remoteNodeId, chan_bc))
|
||||
peerConnection.send(router, PeerRoutingMessage(peerConnection.ref, remoteNodeId, chan_cd))
|
||||
peerConnection.send(router, PeerRoutingMessage(peerConnection.ref, remoteNodeId, chan_ef))
|
||||
peerConnection.send(router, PeerRoutingMessage(remoteNodeId, chan_ab))
|
||||
peerConnection.send(router, PeerRoutingMessage(remoteNodeId, chan_bc))
|
||||
peerConnection.send(router, PeerRoutingMessage(remoteNodeId, chan_cd))
|
||||
peerConnection.send(router, PeerRoutingMessage(remoteNodeId, chan_ef))
|
||||
// then nodes
|
||||
peerConnection.send(router, PeerRoutingMessage(peerConnection.ref, remoteNodeId, node_a))
|
||||
peerConnection.send(router, PeerRoutingMessage(peerConnection.ref, remoteNodeId, node_b))
|
||||
peerConnection.send(router, PeerRoutingMessage(peerConnection.ref, remoteNodeId, node_c))
|
||||
peerConnection.send(router, PeerRoutingMessage(peerConnection.ref, remoteNodeId, node_d))
|
||||
peerConnection.send(router, PeerRoutingMessage(peerConnection.ref, remoteNodeId, node_e))
|
||||
peerConnection.send(router, PeerRoutingMessage(peerConnection.ref, remoteNodeId, node_f))
|
||||
peerConnection.send(router, PeerRoutingMessage(remoteNodeId, node_a))
|
||||
peerConnection.send(router, PeerRoutingMessage(remoteNodeId, node_b))
|
||||
peerConnection.send(router, PeerRoutingMessage(remoteNodeId, node_c))
|
||||
peerConnection.send(router, PeerRoutingMessage(remoteNodeId, node_d))
|
||||
peerConnection.send(router, PeerRoutingMessage(remoteNodeId, node_e))
|
||||
peerConnection.send(router, PeerRoutingMessage(remoteNodeId, node_f))
|
||||
// then channel updates
|
||||
peerConnection.send(router, PeerRoutingMessage(peerConnection.ref, remoteNodeId, update_ab))
|
||||
peerConnection.send(router, PeerRoutingMessage(peerConnection.ref, remoteNodeId, update_ba))
|
||||
peerConnection.send(router, PeerRoutingMessage(peerConnection.ref, remoteNodeId, update_bc))
|
||||
peerConnection.send(router, PeerRoutingMessage(peerConnection.ref, remoteNodeId, update_cb))
|
||||
peerConnection.send(router, PeerRoutingMessage(peerConnection.ref, remoteNodeId, update_cd))
|
||||
peerConnection.send(router, PeerRoutingMessage(peerConnection.ref, remoteNodeId, update_dc))
|
||||
peerConnection.send(router, PeerRoutingMessage(peerConnection.ref, remoteNodeId, update_ef))
|
||||
peerConnection.send(router, PeerRoutingMessage(peerConnection.ref, remoteNodeId, update_fe))
|
||||
peerConnection.send(router, PeerRoutingMessage(remoteNodeId, update_ab))
|
||||
peerConnection.send(router, PeerRoutingMessage(remoteNodeId, update_ba))
|
||||
peerConnection.send(router, PeerRoutingMessage(remoteNodeId, update_bc))
|
||||
peerConnection.send(router, PeerRoutingMessage(remoteNodeId, update_cb))
|
||||
peerConnection.send(router, PeerRoutingMessage(remoteNodeId, update_cd))
|
||||
peerConnection.send(router, PeerRoutingMessage(remoteNodeId, update_dc))
|
||||
peerConnection.send(router, PeerRoutingMessage(remoteNodeId, update_ef))
|
||||
peerConnection.send(router, PeerRoutingMessage(remoteNodeId, update_fe))
|
||||
// watcher receives the get tx requests
|
||||
watcher.expectMsg(ValidateRequest(chan_ab))
|
||||
watcher.expectMsg(ValidateRequest(chan_bc))
|
||||
|
|
|
@ -57,7 +57,7 @@ class RouterSpec extends BaseRouterSpec {
|
|||
val chan_ac = channelAnnouncement(ShortChannelId(420000, 5, 0), priv_a, priv_c, priv_funding_a, priv_funding_c)
|
||||
val update_ac = makeChannelUpdate(Block.RegtestGenesisBlock.hash, priv_a, c, chan_ac.shortChannelId, CltvExpiryDelta(7), 0 msat, 766000 msat, 10, 500000000L msat)
|
||||
val node_c = makeNodeAnnouncement(priv_c, "node-C", Color(123, 100, -40), Nil, hex"0200", timestamp = Platform.currentTime.milliseconds.toSeconds + 1)
|
||||
router ! PeerRoutingMessage(peerConnection.ref, remoteNodeId, chan_ac)
|
||||
peerConnection.send(router, PeerRoutingMessage(remoteNodeId, chan_ac))
|
||||
peerConnection.expectNoMsg(100 millis) // we don't immediately acknowledge the announcement (back pressure)
|
||||
watcher.expectMsg(ValidateRequest(chan_ac))
|
||||
watcher.send(router, ValidateResult(chan_ac, Right(Transaction(version = 0, txIn = Nil, txOut = TxOut(1000000 sat, write(pay2wsh(Scripts.multiSig2of2(funding_a, funding_c)))) :: Nil, lockTime = 0), UtxoStatus.Unspent)))
|
||||
|
@ -65,10 +65,10 @@ class RouterSpec extends BaseRouterSpec {
|
|||
peerConnection.expectMsg(GossipDecision.Accepted(chan_ac))
|
||||
assert(peerConnection.sender() == router)
|
||||
watcher.expectMsgType[WatchSpentBasic]
|
||||
router ! PeerRoutingMessage(peerConnection.ref, remoteNodeId, update_ac)
|
||||
peerConnection.send(router, PeerRoutingMessage(remoteNodeId, update_ac))
|
||||
peerConnection.expectMsg(TransportHandler.ReadAck(update_ac))
|
||||
peerConnection.expectMsg(GossipDecision.Accepted(update_ac))
|
||||
router ! PeerRoutingMessage(peerConnection.ref, remoteNodeId, node_c)
|
||||
peerConnection.send(router, PeerRoutingMessage(remoteNodeId, node_c))
|
||||
peerConnection.expectMsg(TransportHandler.ReadAck(node_c))
|
||||
peerConnection.expectMsg(GossipDecision.Accepted(node_c))
|
||||
eventListener.expectMsg(ChannelsDiscovered(SingleChannelDiscovered(chan_ac, 1000000 sat, None, None) :: Nil))
|
||||
|
@ -87,12 +87,12 @@ class RouterSpec extends BaseRouterSpec {
|
|||
val chan_uc = channelAnnouncement(ShortChannelId(420000, 6, 0), priv_u, priv_c, priv_funding_u, priv_funding_c)
|
||||
val update_uc = makeChannelUpdate(Block.RegtestGenesisBlock.hash, priv_u, c, chan_uc.shortChannelId, CltvExpiryDelta(7), 0 msat, 766000 msat, 10, 500000000L msat)
|
||||
val node_u = makeNodeAnnouncement(priv_u, "node-U", Color(-120, -20, 60), Nil, hex"00")
|
||||
router ! PeerRoutingMessage(peerConnection.ref, remoteNodeId, chan_uc)
|
||||
peerConnection.send(router, PeerRoutingMessage(remoteNodeId, chan_uc))
|
||||
peerConnection.expectNoMsg(200 millis) // we don't immediately acknowledge the announcement (back pressure)
|
||||
watcher.expectMsg(ValidateRequest(chan_uc))
|
||||
router ! PeerRoutingMessage(peerConnection.ref, remoteNodeId, update_uc)
|
||||
peerConnection.send(router, PeerRoutingMessage(remoteNodeId, update_uc))
|
||||
peerConnection.expectMsg(TransportHandler.ReadAck(update_uc))
|
||||
router ! PeerRoutingMessage(peerConnection.ref, remoteNodeId, node_u)
|
||||
peerConnection.send(router, PeerRoutingMessage(remoteNodeId, node_u))
|
||||
peerConnection.expectMsg(TransportHandler.ReadAck(node_u))
|
||||
watcher.send(router, ValidateResult(chan_uc, Right(Transaction(version = 0, txIn = Nil, txOut = TxOut(2000000 sat, write(pay2wsh(Scripts.multiSig2of2(priv_funding_u.publicKey, funding_c)))) :: Nil, lockTime = 0), UtxoStatus.Unspent)))
|
||||
peerConnection.expectMsg(TransportHandler.ReadAck(chan_uc))
|
||||
|
@ -112,13 +112,13 @@ class RouterSpec extends BaseRouterSpec {
|
|||
|
||||
{
|
||||
// duplicates
|
||||
router ! PeerRoutingMessage(peerConnection.ref, remoteNodeId, node_a)
|
||||
peerConnection.send(router, PeerRoutingMessage(remoteNodeId, node_a))
|
||||
peerConnection.expectMsg(TransportHandler.ReadAck(node_a))
|
||||
peerConnection.expectMsg(GossipDecision.Duplicate(node_a))
|
||||
router ! PeerRoutingMessage(peerConnection.ref, remoteNodeId, chan_ab)
|
||||
peerConnection.send(router, PeerRoutingMessage(remoteNodeId, chan_ab))
|
||||
peerConnection.expectMsg(TransportHandler.ReadAck(chan_ab))
|
||||
peerConnection.expectMsg(GossipDecision.Duplicate(chan_ab))
|
||||
router ! PeerRoutingMessage(peerConnection.ref, remoteNodeId, update_ab)
|
||||
peerConnection.send(router, PeerRoutingMessage(remoteNodeId, update_ab))
|
||||
peerConnection.expectMsg(TransportHandler.ReadAck(update_ab))
|
||||
peerConnection.expectMsg(GossipDecision.Duplicate(update_ab))
|
||||
peerConnection.expectNoMsg(100 millis)
|
||||
|
@ -131,13 +131,13 @@ class RouterSpec extends BaseRouterSpec {
|
|||
val invalid_node_a = node_a.copy(timestamp = node_a.timestamp + 10)
|
||||
val invalid_chan_a = channelAnnouncement(ShortChannelId(420000, 5, 1), priv_a, priv_c, priv_funding_a, priv_funding_c).copy(nodeId1 = randomKey.publicKey)
|
||||
val invalid_update_ab = update_ab.copy(cltvExpiryDelta = CltvExpiryDelta(21), timestamp = update_ab.timestamp + 1)
|
||||
router ! PeerRoutingMessage(peerConnection.ref, remoteNodeId, invalid_node_a)
|
||||
peerConnection.send(router, PeerRoutingMessage(remoteNodeId, invalid_node_a))
|
||||
peerConnection.expectMsg(TransportHandler.ReadAck(invalid_node_a))
|
||||
peerConnection.expectMsg(GossipDecision.InvalidSignature(invalid_node_a))
|
||||
router ! PeerRoutingMessage(peerConnection.ref, remoteNodeId, invalid_chan_a)
|
||||
peerConnection.send(router, PeerRoutingMessage(remoteNodeId, invalid_chan_a))
|
||||
peerConnection.expectMsg(TransportHandler.ReadAck(invalid_chan_a))
|
||||
peerConnection.expectMsg(GossipDecision.InvalidSignature(invalid_chan_a))
|
||||
router ! PeerRoutingMessage(peerConnection.ref, remoteNodeId, invalid_update_ab)
|
||||
peerConnection.send(router, PeerRoutingMessage(remoteNodeId, invalid_update_ab))
|
||||
peerConnection.expectMsg(TransportHandler.ReadAck(invalid_update_ab))
|
||||
peerConnection.expectMsg(GossipDecision.InvalidSignature(invalid_update_ab))
|
||||
peerConnection.expectNoMsg(100 millis)
|
||||
|
@ -151,7 +151,7 @@ class RouterSpec extends BaseRouterSpec {
|
|||
val priv_funding_v = randomKey
|
||||
val chan_vc = channelAnnouncement(ShortChannelId(420000, 7, 0), priv_v, priv_c, priv_funding_v, priv_funding_c)
|
||||
nodeParams.db.network.addToPruned(chan_vc.shortChannelId :: Nil)
|
||||
router ! PeerRoutingMessage(peerConnection.ref, remoteNodeId, chan_vc)
|
||||
peerConnection.send(router, PeerRoutingMessage(remoteNodeId, chan_vc))
|
||||
peerConnection.expectMsg(TransportHandler.ReadAck(chan_vc))
|
||||
peerConnection.expectMsg(GossipDecision.ChannelPruned(chan_vc))
|
||||
peerConnection.expectNoMsg(100 millis)
|
||||
|
@ -162,7 +162,7 @@ class RouterSpec extends BaseRouterSpec {
|
|||
{
|
||||
// stale channel update
|
||||
val update_ab = makeChannelUpdate(Block.RegtestGenesisBlock.hash, priv_a, priv_b.publicKey, chan_ab.shortChannelId, CltvExpiryDelta(7), 0 msat, 766000 msat, 10, 500000000L msat, timestamp = (Platform.currentTime.milliseconds - 15.days).toSeconds)
|
||||
router ! PeerRoutingMessage(peerConnection.ref, remoteNodeId, update_ab)
|
||||
peerConnection.send(router, PeerRoutingMessage(remoteNodeId, update_ab))
|
||||
peerConnection.expectMsg(TransportHandler.ReadAck(update_ab))
|
||||
peerConnection.expectMsg(GossipDecision.Stale(update_ab))
|
||||
peerConnection.expectNoMsg(100 millis)
|
||||
|
@ -175,10 +175,10 @@ class RouterSpec extends BaseRouterSpec {
|
|||
val priv_y = randomKey
|
||||
val update_ay = makeChannelUpdate(Block.RegtestGenesisBlock.hash, priv_a, priv_y.publicKey, ShortChannelId(4646464), CltvExpiryDelta(7), 0 msat, 766000 msat, 10, 500000000L msat)
|
||||
val node_y = makeNodeAnnouncement(priv_y, "node-Y", Color(123, 100, -40), Nil, hex"0200")
|
||||
router ! PeerRoutingMessage(peerConnection.ref, remoteNodeId, update_ay)
|
||||
peerConnection.send(router, PeerRoutingMessage(remoteNodeId, update_ay))
|
||||
peerConnection.expectMsg(TransportHandler.ReadAck(update_ay))
|
||||
peerConnection.expectMsg(GossipDecision.NoRelatedChannel(update_ay))
|
||||
router ! PeerRoutingMessage(peerConnection.ref, remoteNodeId, node_y)
|
||||
peerConnection.send(router, PeerRoutingMessage(remoteNodeId, node_y))
|
||||
peerConnection.expectMsg(TransportHandler.ReadAck(node_y))
|
||||
peerConnection.expectMsg(GossipDecision.NoKnownChannel(node_y))
|
||||
peerConnection.expectNoMsg(100 millis)
|
||||
|
@ -193,11 +193,11 @@ class RouterSpec extends BaseRouterSpec {
|
|||
val chan_ay = channelAnnouncement(ShortChannelId(42002), priv_a, priv_y, priv_funding_a, priv_funding_y)
|
||||
val update_ay = makeChannelUpdate(Block.RegtestGenesisBlock.hash, priv_a, priv_y.publicKey, chan_ay.shortChannelId, CltvExpiryDelta(7), 0 msat, 766000 msat, 10, 500000000L msat)
|
||||
val node_y = makeNodeAnnouncement(priv_y, "node-Y", Color(123, 100, -40), Nil, hex"0200")
|
||||
router ! PeerRoutingMessage(peerConnection.ref, remoteNodeId, chan_ay)
|
||||
peerConnection.send(router, PeerRoutingMessage(remoteNodeId, chan_ay))
|
||||
watcher.expectMsg(ValidateRequest(chan_ay))
|
||||
router ! PeerRoutingMessage(peerConnection.ref, remoteNodeId, update_ay)
|
||||
peerConnection.send(router, PeerRoutingMessage(remoteNodeId, update_ay))
|
||||
peerConnection.expectMsg(TransportHandler.ReadAck(update_ay))
|
||||
router ! PeerRoutingMessage(peerConnection.ref, remoteNodeId, node_y)
|
||||
peerConnection.send(router, PeerRoutingMessage(remoteNodeId, node_y))
|
||||
peerConnection.expectMsg(TransportHandler.ReadAck(node_y))
|
||||
watcher.send(router, ValidateResult(chan_ay, Right(Transaction(version = 0, txIn = Nil, txOut = TxOut(1000000 sat, write(pay2wsh(Scripts.multiSig2of2(funding_a, randomKey.publicKey)))) :: Nil, lockTime = 0), UtxoStatus.Unspent)))
|
||||
peerConnection.expectMsg(TransportHandler.ReadAck(chan_ay))
|
||||
|
@ -213,7 +213,7 @@ class RouterSpec extends BaseRouterSpec {
|
|||
// validation failure
|
||||
val priv_x = randomKey
|
||||
val chan_ax = channelAnnouncement(ShortChannelId(42001), priv_a, priv_x, priv_funding_a, randomKey)
|
||||
router ! PeerRoutingMessage(peerConnection.ref, remoteNodeId, chan_ax)
|
||||
peerConnection.send(router, PeerRoutingMessage(remoteNodeId, chan_ax))
|
||||
watcher.expectMsg(ValidateRequest(chan_ax))
|
||||
watcher.send(router, ValidateResult(chan_ax, Left(new RuntimeException("funding tx not found"))))
|
||||
peerConnection.expectMsg(TransportHandler.ReadAck(chan_ax))
|
||||
|
@ -228,7 +228,7 @@ class RouterSpec extends BaseRouterSpec {
|
|||
val priv_z = randomKey
|
||||
val priv_funding_z = randomKey
|
||||
val chan_az = channelAnnouncement(ShortChannelId(42003), priv_a, priv_z, priv_funding_a, priv_funding_z)
|
||||
router ! PeerRoutingMessage(peerConnection.ref, remoteNodeId, chan_az)
|
||||
peerConnection.send(router, PeerRoutingMessage(remoteNodeId, chan_az))
|
||||
watcher.expectMsg(ValidateRequest(chan_az))
|
||||
watcher.send(router, ValidateResult(chan_az, Right(Transaction(version = 0, txIn = Nil, txOut = TxOut(1000000 sat, write(pay2wsh(Scripts.multiSig2of2(funding_a, priv_funding_z.publicKey)))) :: Nil, lockTime = 0), UtxoStatus.Spent(spendingTxConfirmed = false))))
|
||||
peerConnection.expectMsg(TransportHandler.ReadAck(chan_az))
|
||||
|
@ -243,7 +243,7 @@ class RouterSpec extends BaseRouterSpec {
|
|||
val priv_z = randomKey
|
||||
val priv_funding_z = randomKey
|
||||
val chan_az = channelAnnouncement(ShortChannelId(42003), priv_a, priv_z, priv_funding_a, priv_funding_z)
|
||||
router ! PeerRoutingMessage(peerConnection.ref, remoteNodeId, chan_az)
|
||||
peerConnection.send(router, PeerRoutingMessage(remoteNodeId, chan_az))
|
||||
watcher.expectMsg(ValidateRequest(chan_az))
|
||||
watcher.send(router, ValidateResult(chan_az, Right(Transaction(version = 0, txIn = Nil, txOut = TxOut(1000000 sat, write(pay2wsh(Scripts.multiSig2of2(funding_a, priv_funding_z.publicKey)))) :: Nil, lockTime = 0), UtxoStatus.Spent(spendingTxConfirmed = true))))
|
||||
peerConnection.expectMsg(TransportHandler.ReadAck(chan_az))
|
||||
|
@ -284,20 +284,20 @@ class RouterSpec extends BaseRouterSpec {
|
|||
|
||||
test("handle bad signature for ChannelAnnouncement") { fixture =>
|
||||
import fixture._
|
||||
val sender = TestProbe()
|
||||
val peerConnection = TestProbe()
|
||||
val channelId_ac = ShortChannelId(420000, 5, 0)
|
||||
val chan_ac = channelAnnouncement(channelId_ac, priv_a, priv_c, priv_funding_a, priv_funding_c)
|
||||
val buggy_chan_ac = chan_ac.copy(nodeSignature1 = chan_ac.nodeSignature2)
|
||||
sender.send(router, PeerRoutingMessage(sender.ref, remoteNodeId, buggy_chan_ac))
|
||||
sender.expectMsg(TransportHandler.ReadAck(buggy_chan_ac))
|
||||
sender.expectMsg(GossipDecision.InvalidSignature(buggy_chan_ac))
|
||||
peerConnection.send(router, PeerRoutingMessage(remoteNodeId, buggy_chan_ac))
|
||||
peerConnection.expectMsg(TransportHandler.ReadAck(buggy_chan_ac))
|
||||
peerConnection.expectMsg(GossipDecision.InvalidSignature(buggy_chan_ac))
|
||||
}
|
||||
|
||||
test("handle bad signature for NodeAnnouncement") { fixture =>
|
||||
import fixture._
|
||||
val peerConnection = TestProbe()
|
||||
val buggy_ann_a = node_a.copy(signature = node_b.signature, timestamp = node_a.timestamp + 1)
|
||||
peerConnection.send(router, PeerRoutingMessage(peerConnection.ref, remoteNodeId, buggy_ann_a))
|
||||
peerConnection.send(router, PeerRoutingMessage(remoteNodeId, buggy_ann_a))
|
||||
peerConnection.expectMsg(TransportHandler.ReadAck(buggy_ann_a))
|
||||
peerConnection.expectMsg(GossipDecision.InvalidSignature(buggy_ann_a))
|
||||
}
|
||||
|
@ -306,7 +306,7 @@ class RouterSpec extends BaseRouterSpec {
|
|||
import fixture._
|
||||
val peerConnection = TestProbe()
|
||||
val buggy_channelUpdate_ab = update_ab.copy(signature = node_b.signature, timestamp = update_ab.timestamp + 1)
|
||||
peerConnection.send(router, PeerRoutingMessage(peerConnection.ref, remoteNodeId, buggy_channelUpdate_ab))
|
||||
peerConnection.send(router, PeerRoutingMessage(remoteNodeId, buggy_channelUpdate_ab))
|
||||
peerConnection.expectMsg(TransportHandler.ReadAck(buggy_channelUpdate_ab))
|
||||
peerConnection.expectMsg(GossipDecision.InvalidSignature(buggy_channelUpdate_ab))
|
||||
}
|
||||
|
@ -369,7 +369,7 @@ class RouterSpec extends BaseRouterSpec {
|
|||
assert(res.hops.last.nextNodeId === d)
|
||||
|
||||
val channelUpdate_cd1 = makeChannelUpdate(Block.RegtestGenesisBlock.hash, priv_c, d, channelId_cd, CltvExpiryDelta(3), 0 msat, 153000 msat, 4, 500000000L msat, enable = false)
|
||||
peerConnection.send(router, PeerRoutingMessage(peerConnection.ref, remoteNodeId, channelUpdate_cd1))
|
||||
peerConnection.send(router, PeerRoutingMessage(remoteNodeId, channelUpdate_cd1))
|
||||
peerConnection.expectMsg(TransportHandler.ReadAck(channelUpdate_cd1))
|
||||
sender.send(router, RouteRequest(a, d, DEFAULT_AMOUNT_MSAT, routeParams = relaxedRouteParams))
|
||||
sender.expectMsg(Failure(RouteNotFound))
|
||||
|
@ -446,27 +446,28 @@ class RouterSpec extends BaseRouterSpec {
|
|||
val blockHeight = 400000 - 2020
|
||||
val channelId = ShortChannelId(blockHeight, 5, 0)
|
||||
val announcement = channelAnnouncement(channelId, priv_a, priv_c, priv_funding_a, priv_funding_c)
|
||||
val timestamp = (Platform.currentTime.milliseconds - 14.days - 1.day).toSeconds
|
||||
val update = makeChannelUpdate(Block.RegtestGenesisBlock.hash, priv_a, c, channelId, CltvExpiryDelta(7), 0 msat, 766000 msat, 10, 5 msat, timestamp = timestamp)
|
||||
val probe = TestProbe()
|
||||
probe.ignoreMsg { case _: TransportHandler.ReadAck => true }
|
||||
probe.send(router, PeerRoutingMessage(probe.ref, remoteNodeId, announcement))
|
||||
val oldTimestamp = (Platform.currentTime.milliseconds - 14.days - 1.day).toSeconds
|
||||
val staleUpdate = makeChannelUpdate(Block.RegtestGenesisBlock.hash, priv_a, c, channelId, CltvExpiryDelta(7), 0 msat, 766000 msat, 10, 5 msat, timestamp = oldTimestamp)
|
||||
val peerConnection = TestProbe()
|
||||
peerConnection.ignoreMsg { case _: TransportHandler.ReadAck => true }
|
||||
peerConnection.send(router, PeerRoutingMessage(remoteNodeId, announcement))
|
||||
watcher.expectMsgType[ValidateRequest]
|
||||
probe.send(router, PeerRoutingMessage(probe.ref, remoteNodeId, update))
|
||||
peerConnection.send(router, PeerRoutingMessage(remoteNodeId, staleUpdate))
|
||||
watcher.send(router, ValidateResult(announcement, Right((Transaction(version = 0, txIn = Nil, txOut = TxOut(1000000 sat, write(pay2wsh(Scripts.multiSig2of2(funding_a, funding_c)))) :: Nil, lockTime = 0), UtxoStatus.Unspent))))
|
||||
peerConnection.expectMsg(GossipDecision.Accepted(announcement))
|
||||
peerConnection.expectMsg(GossipDecision.Stale(staleUpdate))
|
||||
|
||||
val probe = TestProbe()
|
||||
probe.send(router, TickPruneStaleChannels)
|
||||
val sender = TestProbe()
|
||||
sender.send(router, GetRoutingState)
|
||||
sender.expectMsgType[RoutingState]
|
||||
|
||||
val update1 = makeChannelUpdate(Block.RegtestGenesisBlock.hash, priv_a, c, channelId, CltvExpiryDelta(7), 0 msat, 766000 msat, 10, 500000000L msat, timestamp = Platform.currentTime.millisecond.toSeconds)
|
||||
val recentUpdate = makeChannelUpdate(Block.RegtestGenesisBlock.hash, priv_a, c, channelId, CltvExpiryDelta(7), 0 msat, 766000 msat, 10, 500000000L msat, timestamp = Platform.currentTime.millisecond.toSeconds)
|
||||
|
||||
// we want to make sure that transport receives the query
|
||||
val peerConnection = TestProbe()
|
||||
probe.send(router, PeerRoutingMessage(peerConnection.ref, remoteNodeId, update1))
|
||||
peerConnection.expectMsg(TransportHandler.ReadAck(update1))
|
||||
peerConnection.expectMsg(GossipDecision.RelatedChannelPruned(update1))
|
||||
peerConnection.send(router, PeerRoutingMessage(remoteNodeId, recentUpdate))
|
||||
peerConnection.expectMsg(GossipDecision.RelatedChannelPruned(recentUpdate))
|
||||
val query = peerConnection.expectMsgType[QueryShortChannelIds]
|
||||
assert(query.shortChannelIds.array == List(channelId))
|
||||
}
|
||||
|
|
|
@ -27,7 +27,7 @@ import fr.acinq.eclair.crypto.TransportHandler
|
|||
import fr.acinq.eclair.io.Peer.PeerRoutingMessage
|
||||
import fr.acinq.eclair.router.Announcements.{makeChannelUpdate, makeNodeAnnouncement}
|
||||
import fr.acinq.eclair.router.BaseRouterSpec.channelAnnouncement
|
||||
import fr.acinq.eclair.router.Router.{Data, GossipDecision, PublicChannel, SendChannelQuery, State}
|
||||
import fr.acinq.eclair.router.Router._
|
||||
import fr.acinq.eclair.router.Sync._
|
||||
import fr.acinq.eclair.transactions.Scripts
|
||||
import fr.acinq.eclair.wire._
|
||||
|
@ -74,7 +74,6 @@ class RoutingSyncSpec extends TestKit(ActorSystem("test")) with FunSuiteLike wit
|
|||
}
|
||||
|
||||
def sync(src: TestFSMRef[State, Data, Router], tgt: TestFSMRef[State, Data, Router], extendedQueryFlags_opt: Option[QueryChannelRangeTlv]): SyncResult = {
|
||||
val sender = TestProbe()
|
||||
val pipe = TestProbe()
|
||||
pipe.ignoreMsg {
|
||||
case _: TransportHandler.ReadAck => true
|
||||
|
@ -84,10 +83,10 @@ class RoutingSyncSpec extends TestKit(ActorSystem("test")) with FunSuiteLike wit
|
|||
}
|
||||
val srcId = src.underlyingActor.nodeParams.nodeId
|
||||
val tgtId = tgt.underlyingActor.nodeParams.nodeId
|
||||
sender.send(src, SendChannelQuery(src.underlyingActor.nodeParams.chainHash, tgtId, pipe.ref, extendedQueryFlags_opt))
|
||||
pipe.send(src, SendChannelQuery(src.underlyingActor.nodeParams.chainHash, tgtId, extendedQueryFlags_opt))
|
||||
// src sends a query_channel_range to bob
|
||||
val qcr = pipe.expectMsgType[QueryChannelRange]
|
||||
pipe.send(tgt, PeerRoutingMessage(pipe.ref, srcId, qcr))
|
||||
pipe.send(tgt, PeerRoutingMessage(srcId, qcr))
|
||||
// this allows us to know when the last reply_channel_range has been set
|
||||
pipe.send(tgt, 'data)
|
||||
// tgt answers with reply_channel_ranges
|
||||
|
@ -95,7 +94,7 @@ class RoutingSyncSpec extends TestKit(ActorSystem("test")) with FunSuiteLike wit
|
|||
case rcr: ReplyChannelRange => rcr
|
||||
}
|
||||
pipe.expectMsgType[Data]
|
||||
rcrs.foreach(rcr => pipe.send(src, PeerRoutingMessage(pipe.ref, tgtId, rcr)))
|
||||
rcrs.foreach(rcr => pipe.send(src, PeerRoutingMessage(tgtId, rcr)))
|
||||
// then src will now query announcements
|
||||
var queries = Vector.empty[QueryShortChannelIds]
|
||||
var channels = Vector.empty[ChannelAnnouncement]
|
||||
|
@ -104,7 +103,7 @@ class RoutingSyncSpec extends TestKit(ActorSystem("test")) with FunSuiteLike wit
|
|||
while (src.stateData.sync.nonEmpty) {
|
||||
// for each chunk, src sends a query_short_channel_id
|
||||
val query = pipe.expectMsgType[QueryShortChannelIds]
|
||||
pipe.send(tgt, PeerRoutingMessage(pipe.ref, srcId, query))
|
||||
pipe.send(tgt, PeerRoutingMessage(srcId, query))
|
||||
queries = queries :+ query
|
||||
val announcements = pipe.receiveWhile() {
|
||||
case c: ChannelAnnouncement =>
|
||||
|
@ -118,10 +117,10 @@ class RoutingSyncSpec extends TestKit(ActorSystem("test")) with FunSuiteLike wit
|
|||
n
|
||||
}
|
||||
// tgt replies with announcements
|
||||
announcements.foreach(ann => pipe.send(src, PeerRoutingMessage(pipe.ref, tgtId, ann)))
|
||||
announcements.foreach(ann => pipe.send(src, PeerRoutingMessage(tgtId, ann)))
|
||||
// and tgt ends this chunk with a reply_short_channel_id_end
|
||||
val rscie = pipe.expectMsgType[ReplyShortChannelIdsEnd]
|
||||
pipe.send(src, PeerRoutingMessage(pipe.ref, tgtId, rscie))
|
||||
pipe.send(src, PeerRoutingMessage(tgtId, rscie))
|
||||
}
|
||||
SyncResult(rcrs, queries, channels, updates, nodes)
|
||||
}
|
||||
|
@ -146,11 +145,11 @@ class RoutingSyncSpec extends TestKit(ActorSystem("test")) with FunSuiteLike wit
|
|||
// add some channels and updates to bob and resync
|
||||
fakeRoutingInfo.take(10).values.foreach {
|
||||
case (pc, na1, na2) =>
|
||||
sender.send(bob, PeerRoutingMessage(sender.ref, charlieId, pc.ann))
|
||||
sender.send(bob, PeerRoutingMessage(sender.ref, charlieId, pc.update_1_opt.get))
|
||||
sender.send(bob, PeerRoutingMessage(charlieId, pc.ann))
|
||||
sender.send(bob, PeerRoutingMessage(charlieId, pc.update_1_opt.get))
|
||||
// we don't send channel_update #2
|
||||
sender.send(bob, PeerRoutingMessage(sender.ref, charlieId, na1))
|
||||
sender.send(bob, PeerRoutingMessage(sender.ref, charlieId, na2))
|
||||
sender.send(bob, PeerRoutingMessage(charlieId, na1))
|
||||
sender.send(bob, PeerRoutingMessage(charlieId, na2))
|
||||
}
|
||||
awaitCond(bob.stateData.channels.size === 10 && countUpdates(bob.stateData.channels) === 10)
|
||||
assert(BasicSyncResult(ranges = 1, queries = 2, channels = 10, updates = 10, nodes = 10 * 2) === sync(alice, bob, extendedQueryFlags_opt).counts)
|
||||
|
@ -159,7 +158,7 @@ class RoutingSyncSpec extends TestKit(ActorSystem("test")) with FunSuiteLike wit
|
|||
// add some updates to bob and resync
|
||||
fakeRoutingInfo.take(10).values.foreach {
|
||||
case (pc, _, _) =>
|
||||
sender.send(bob, PeerRoutingMessage(sender.ref, charlieId, pc.update_2_opt.get))
|
||||
sender.send(bob, PeerRoutingMessage(charlieId, pc.update_2_opt.get))
|
||||
}
|
||||
awaitCond(bob.stateData.channels.size === 10 && countUpdates(bob.stateData.channels) === 10 * 2)
|
||||
assert(BasicSyncResult(ranges = 1, queries = 2, channels = 10, updates = 10 * 2, nodes = 10 * 2) === sync(alice, bob, extendedQueryFlags_opt).counts)
|
||||
|
@ -168,11 +167,11 @@ class RoutingSyncSpec extends TestKit(ActorSystem("test")) with FunSuiteLike wit
|
|||
// add everything (duplicates will be ignored)
|
||||
fakeRoutingInfo.values.foreach {
|
||||
case (pc, na1, na2) =>
|
||||
sender.send(bob, PeerRoutingMessage(sender.ref, charlieId, pc.ann))
|
||||
sender.send(bob, PeerRoutingMessage(sender.ref, charlieId, pc.update_1_opt.get))
|
||||
sender.send(bob, PeerRoutingMessage(sender.ref, charlieId, pc.update_2_opt.get))
|
||||
sender.send(bob, PeerRoutingMessage(sender.ref, charlieId, na1))
|
||||
sender.send(bob, PeerRoutingMessage(sender.ref, charlieId, na2))
|
||||
sender.send(bob, PeerRoutingMessage(charlieId, pc.ann))
|
||||
sender.send(bob, PeerRoutingMessage(charlieId, pc.update_1_opt.get))
|
||||
sender.send(bob, PeerRoutingMessage(charlieId, pc.update_2_opt.get))
|
||||
sender.send(bob, PeerRoutingMessage(charlieId, na1))
|
||||
sender.send(bob, PeerRoutingMessage(charlieId, na2))
|
||||
}
|
||||
awaitCond(bob.stateData.channels.size === fakeRoutingInfo.size && countUpdates(bob.stateData.channels) === 2 * fakeRoutingInfo.size, max = 60 seconds)
|
||||
assert(BasicSyncResult(ranges = 3, queries = 13, channels = fakeRoutingInfo.size, updates = 2 * fakeRoutingInfo.size, nodes = 2 * fakeRoutingInfo.size) === sync(alice, bob, extendedQueryFlags_opt).counts)
|
||||
|
@ -194,11 +193,11 @@ class RoutingSyncSpec extends TestKit(ActorSystem("test")) with FunSuiteLike wit
|
|||
// add some channels and updates to bob and resync
|
||||
fakeRoutingInfo.take(10).values.foreach {
|
||||
case (pc, na1, na2) =>
|
||||
sender.send(bob, PeerRoutingMessage(sender.ref, charlieId, pc.ann))
|
||||
sender.send(bob, PeerRoutingMessage(sender.ref, charlieId, pc.update_1_opt.get))
|
||||
sender.send(bob, PeerRoutingMessage(charlieId, pc.ann))
|
||||
sender.send(bob, PeerRoutingMessage(charlieId, pc.update_1_opt.get))
|
||||
// we don't send channel_update #2
|
||||
sender.send(bob, PeerRoutingMessage(sender.ref, charlieId, na1))
|
||||
sender.send(bob, PeerRoutingMessage(sender.ref, charlieId, na2))
|
||||
sender.send(bob, PeerRoutingMessage(charlieId, na1))
|
||||
sender.send(bob, PeerRoutingMessage(charlieId, na2))
|
||||
}
|
||||
awaitCond(bob.stateData.channels.size === 10 && countUpdates(bob.stateData.channels) === 10)
|
||||
assert(BasicSyncResult(ranges = 1, queries = 2, channels = 10, updates = 10, nodes = if (requestNodeAnnouncements) 10 * 2 else 0) === sync(alice, bob, extendedQueryFlags_opt).counts)
|
||||
|
@ -208,7 +207,7 @@ class RoutingSyncSpec extends TestKit(ActorSystem("test")) with FunSuiteLike wit
|
|||
// add some updates to bob and resync
|
||||
fakeRoutingInfo.take(10).values.foreach {
|
||||
case (pc, _, _) =>
|
||||
sender.send(bob, PeerRoutingMessage(sender.ref, charlieId, pc.update_2_opt.get))
|
||||
sender.send(bob, PeerRoutingMessage(charlieId, pc.update_2_opt.get))
|
||||
}
|
||||
awaitCond(bob.stateData.channels.size === 10 && countUpdates(bob.stateData.channels) === 10 * 2)
|
||||
assert(BasicSyncResult(ranges = 1, queries = 2, channels = 0, updates = 10, nodes = if (requestNodeAnnouncements) 10 * 2 else 0) === sync(alice, bob, extendedQueryFlags_opt).counts)
|
||||
|
@ -217,11 +216,11 @@ class RoutingSyncSpec extends TestKit(ActorSystem("test")) with FunSuiteLike wit
|
|||
// add everything (duplicates will be ignored)
|
||||
fakeRoutingInfo.values.foreach {
|
||||
case (pc, na1, na2) =>
|
||||
sender.send(bob, PeerRoutingMessage(sender.ref, charlieId, pc.ann))
|
||||
sender.send(bob, PeerRoutingMessage(sender.ref, charlieId, pc.update_1_opt.get))
|
||||
sender.send(bob, PeerRoutingMessage(sender.ref, charlieId, pc.update_2_opt.get))
|
||||
sender.send(bob, PeerRoutingMessage(sender.ref, charlieId, na1))
|
||||
sender.send(bob, PeerRoutingMessage(sender.ref, charlieId, na2))
|
||||
sender.send(bob, PeerRoutingMessage(charlieId, pc.ann))
|
||||
sender.send(bob, PeerRoutingMessage(charlieId, pc.update_1_opt.get))
|
||||
sender.send(bob, PeerRoutingMessage(charlieId, pc.update_2_opt.get))
|
||||
sender.send(bob, PeerRoutingMessage(charlieId, na1))
|
||||
sender.send(bob, PeerRoutingMessage(charlieId, na2))
|
||||
}
|
||||
awaitCond(bob.stateData.channels.size === fakeRoutingInfo.size && countUpdates(bob.stateData.channels) === 2 * fakeRoutingInfo.size, max = 60 seconds)
|
||||
assert(BasicSyncResult(ranges = 3, queries = 11, channels = fakeRoutingInfo.size - 10, updates = 2 * (fakeRoutingInfo.size - 10), nodes = if (requestNodeAnnouncements) 2 * (fakeRoutingInfo.size - 10) else 0) === sync(alice, bob, extendedQueryFlags_opt).counts)
|
||||
|
@ -234,7 +233,7 @@ class RoutingSyncSpec extends TestKit(ActorSystem("test")) with FunSuiteLike wit
|
|||
}
|
||||
|
||||
val bumpedUpdates = (List(0, 3, 7).map(touchUpdate(_, true)) ++ List(1, 3, 9).map(touchUpdate(_, false))).toSet
|
||||
bumpedUpdates.foreach(c => sender.send(bob, PeerRoutingMessage(sender.ref, charlieId, c)))
|
||||
bumpedUpdates.foreach(c => sender.send(bob, PeerRoutingMessage(charlieId, c)))
|
||||
assert(BasicSyncResult(ranges = 3, queries = 1, channels = 0, updates = bumpedUpdates.size, nodes = if (requestNodeAnnouncements) 5 * 2 else 0) === sync(alice, bob, extendedQueryFlags_opt).counts)
|
||||
awaitCond(alice.stateData.channels === bob.stateData.channels, max = 60 seconds)
|
||||
if (requestNodeAnnouncements) awaitCond(alice.stateData.nodes === bob.stateData.nodes)
|
||||
|
@ -251,29 +250,30 @@ class RoutingSyncSpec extends TestKit(ActorSystem("test")) with FunSuiteLike wit
|
|||
test("reset sync state on reconnection") {
|
||||
val params = TestConstants.Alice.nodeParams
|
||||
val router = TestFSMRef(new Router(params, TestProbe().ref))
|
||||
val transport = TestProbe()
|
||||
val peerConnection = TestProbe()
|
||||
peerConnection.ignoreMsg { case _: TransportHandler.ReadAck => true }
|
||||
val sender = TestProbe()
|
||||
sender.ignoreMsg { case _: TransportHandler.ReadAck => true }
|
||||
val remoteNodeId = TestConstants.Bob.nodeParams.nodeId
|
||||
|
||||
// ask router to send a channel range query
|
||||
sender.send(router, SendChannelQuery(params.chainHash, remoteNodeId, sender.ref, None))
|
||||
sender.send(router, SendChannelQuery(params.chainHash, remoteNodeId, None))
|
||||
val QueryChannelRange(chainHash, firstBlockNum, numberOfBlocks, _) = sender.expectMsgType[QueryChannelRange]
|
||||
sender.expectMsgType[GossipTimestampFilter]
|
||||
|
||||
val block1 = ReplyChannelRange(chainHash, firstBlockNum, numberOfBlocks, 1, EncodedShortChannelIds(EncodingType.UNCOMPRESSED, fakeRoutingInfo.take(params.routerConf.channelQueryChunkSize).keys.toList), None, None)
|
||||
|
||||
// send first block
|
||||
sender.send(router, PeerRoutingMessage(transport.ref, remoteNodeId, block1))
|
||||
peerConnection.send(router, PeerRoutingMessage(remoteNodeId, block1))
|
||||
|
||||
// router should ask for our first block of ids
|
||||
assert(transport.expectMsgType[QueryShortChannelIds] === QueryShortChannelIds(chainHash, block1.shortChannelIds, TlvStream.empty))
|
||||
assert(peerConnection.expectMsgType[QueryShortChannelIds] === QueryShortChannelIds(chainHash, block1.shortChannelIds, TlvStream.empty))
|
||||
// router should think that it is missing 100 channels, in one request
|
||||
val Some(sync) = router.stateData.sync.get(remoteNodeId)
|
||||
assert(sync.total == 1)
|
||||
|
||||
// simulate a re-connection
|
||||
sender.send(router, SendChannelQuery(params.chainHash, remoteNodeId, sender.ref, None))
|
||||
sender.send(router, SendChannelQuery(params.chainHash, remoteNodeId, None))
|
||||
sender.expectMsgType[QueryChannelRange]
|
||||
sender.expectMsgType[GossipTimestampFilter]
|
||||
assert(router.stateData.sync.get(remoteNodeId).isEmpty)
|
||||
|
|
Loading…
Add table
Reference in a new issue