mirror of
https://github.com/ACINQ/eclair.git
synced 2025-02-23 06:35:11 +01:00
Add replyTo
field to Router.GetNode (#2575)
This commit is contained in:
parent
06fcb1f632
commit
9d4f2baedf
4 changed files with 20 additions and 15 deletions
|
@ -17,8 +17,9 @@
|
|||
package fr.acinq.eclair
|
||||
|
||||
import akka.actor.ActorRef
|
||||
import akka.actor.typed.Scheduler
|
||||
import akka.actor.typed.scaladsl.AskPattern.Askable
|
||||
import akka.actor.typed.scaladsl.adapter.ClassicSchedulerOps
|
||||
import akka.actor.typed.scaladsl.adapter.{ClassicActorRefOps, ClassicSchedulerOps}
|
||||
import akka.pattern._
|
||||
import akka.util.Timeout
|
||||
import com.softwaremill.quicklens.ModifyPimp
|
||||
|
@ -169,6 +170,7 @@ trait Eclair {
|
|||
class EclairImpl(appKit: Kit) extends Eclair with Logging {
|
||||
|
||||
implicit val ec: ExecutionContext = appKit.system.dispatcher
|
||||
implicit val scheduler: Scheduler = appKit.system.scheduler.toTyped
|
||||
|
||||
// We constrain external identifiers. This allows uuid, long and pubkey to be used.
|
||||
private val externalIdMaxLength = 66
|
||||
|
@ -225,7 +227,7 @@ class EclairImpl(appKit: Kit) extends Eclair with Logging {
|
|||
} yield peerinfos
|
||||
|
||||
override def node(nodeId: PublicKey)(implicit timeout: Timeout): Future[Option[Router.PublicNode]] = {
|
||||
(appKit.router ? Router.GetNode(nodeId)).mapTo[Router.GetNodeResponse].map {
|
||||
appKit.router.toTyped.ask(ref => Router.GetNode(ref, nodeId)).map {
|
||||
case n: PublicNode => Some(n)
|
||||
case _: UnknownNode => None
|
||||
}
|
||||
|
@ -516,8 +518,8 @@ class EclairImpl(appKit: Kit) extends Eclair with Logging {
|
|||
|
||||
override def globalBalance()(implicit timeout: Timeout): Future[GlobalBalance] = {
|
||||
for {
|
||||
ChannelsListener.GetChannelsResponse(channels) <- appKit.channelsListener.ask(ref => ChannelsListener.GetChannels(ref))(timeout, appKit.system.scheduler.toTyped)
|
||||
globalBalance_try <- appKit.balanceActor.ask(res => BalanceActor.GetGlobalBalance(res, channels))(timeout, appKit.system.scheduler.toTyped)
|
||||
ChannelsListener.GetChannelsResponse(channels) <- appKit.channelsListener.ask(ref => ChannelsListener.GetChannels(ref))
|
||||
globalBalance_try <- appKit.balanceActor.ask(res => BalanceActor.GetGlobalBalance(res, channels))
|
||||
globalBalance <- Promise[GlobalBalance]().complete(globalBalance_try).future
|
||||
} yield globalBalance
|
||||
}
|
||||
|
@ -568,7 +570,7 @@ class EclairImpl(appKit: Kit) extends Eclair with Logging {
|
|||
case Left(key) => OnionMessages.Recipient(key, None)
|
||||
case Right(route) => OnionMessages.BlindedPath(route)
|
||||
}
|
||||
appKit.postman.ask(ref => Postman.SendMessage(intermediateNodes, destination, replyPath, userTlvs, ref, appKit.nodeParams.onionMessageConfig.timeout))(timeout, appKit.system.scheduler.toTyped).mapTo[Postman.OnionMessageResponse].map {
|
||||
appKit.postman.ask(ref => Postman.SendMessage(intermediateNodes, destination, replyPath, userTlvs, ref, appKit.nodeParams.onionMessageConfig.timeout)).mapTo[Postman.OnionMessageResponse].map {
|
||||
case Postman.Response(payload) =>
|
||||
val encodedReplyPath = payload.replyPath_opt.map(route => blindedRouteCodec.encode(route).require.bytes.toHex)
|
||||
SendOnionMessageResponse(sent = true, None, Some(SendOnionMessageResponsePayload(encodedReplyPath, payload.replyPath_opt, payload.records.unknown.map(tlv => tlv.tag.toString -> tlv.value).toMap)))
|
||||
|
|
|
@ -201,15 +201,15 @@ class Router(val nodeParams: NodeParams, watcher: typed.ActorRef[ZmqWatcher.Comm
|
|||
sender() ! d.excludedChannels
|
||||
stay()
|
||||
|
||||
case Event(GetNode(nodeId), d) =>
|
||||
case Event(GetNode(replyTo, nodeId), d) =>
|
||||
d.nodes.get(nodeId) match {
|
||||
case Some(announcement) =>
|
||||
// This only provides a lower bound on the number of channels this peer has: disabled channels will be filtered out.
|
||||
val activeChannels = d.graphWithBalances.graph.getIncomingEdgesOf(nodeId)
|
||||
val totalCapacity = activeChannels.map(_.capacity).sum
|
||||
sender() ! PublicNode(announcement, activeChannels.size, totalCapacity)
|
||||
replyTo ! PublicNode(announcement, activeChannels.size, totalCapacity)
|
||||
case None =>
|
||||
sender() ! UnknownNode(nodeId)
|
||||
replyTo ! UnknownNode(nodeId)
|
||||
}
|
||||
stay()
|
||||
|
||||
|
@ -670,7 +670,7 @@ object Router {
|
|||
case object GetChannelsMap
|
||||
case object GetChannelUpdates
|
||||
|
||||
case class GetNode(nodeId: PublicKey)
|
||||
case class GetNode(replyTo: typed.ActorRef[GetNodeResponse], nodeId: PublicKey)
|
||||
sealed trait GetNodeResponse
|
||||
case class PublicNode(announcement: NodeAnnouncement, activeChannels: Int, totalCapacity: Satoshi) extends GetNodeResponse
|
||||
case class UnknownNode(nodeId: PublicKey) extends GetNodeResponse
|
||||
|
|
|
@ -158,14 +158,16 @@ class EclairImplSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with I
|
|||
val ann = NodeAnnouncement(randomBytes64(), Features.empty, TimestampSecond(42L), randomKey().publicKey, Color(42, 42, 42), "ACINQ", Nil)
|
||||
val remoteNode = Router.PublicNode(ann, 7, 561_000 sat)
|
||||
eclair.node(ann.nodeId).pipeTo(sender.ref)
|
||||
assert(router.expectMsgType[Router.GetNode].nodeId == ann.nodeId)
|
||||
router.reply(remoteNode)
|
||||
val msg1 = router.expectMsgType[Router.GetNode]
|
||||
assert(msg1.nodeId == ann.nodeId)
|
||||
msg1.replyTo ! remoteNode
|
||||
sender.expectMsg(Some(remoteNode))
|
||||
|
||||
val unknownNode = Router.UnknownNode(randomKey().publicKey)
|
||||
eclair.node(unknownNode.nodeId).pipeTo(sender.ref)
|
||||
assert(router.expectMsgType[Router.GetNode].nodeId == unknownNode.nodeId)
|
||||
router.reply(unknownNode)
|
||||
val msg2 = router.expectMsgType[Router.GetNode]
|
||||
assert(msg2.nodeId == unknownNode.nodeId)
|
||||
msg2.replyTo ! unknownNode
|
||||
sender.expectMsg(None)
|
||||
}
|
||||
|
||||
|
|
|
@ -18,6 +18,7 @@ package fr.acinq.eclair.router
|
|||
|
||||
import akka.actor.Status
|
||||
import akka.actor.Status.Failure
|
||||
import akka.actor.typed.scaladsl.adapter.ClassicActorRefOps
|
||||
import akka.testkit.TestProbe
|
||||
import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey
|
||||
import fr.acinq.bitcoin.scalacompat.Script.{pay2wsh, write}
|
||||
|
@ -311,9 +312,9 @@ class RouterSpec extends BaseRouterSpec {
|
|||
|
||||
val probe = TestProbe()
|
||||
val unknownNodeId = randomKey().publicKey
|
||||
probe.send(router, GetNode(unknownNodeId))
|
||||
router ! GetNode(probe.ref.toTyped, unknownNodeId)
|
||||
probe.expectMsg(UnknownNode(unknownNodeId))
|
||||
probe.send(router, GetNode(b))
|
||||
router ! GetNode(probe.ref.toTyped, b)
|
||||
probe.expectMsg(PublicNode(node_b, 2, publicChannelCapacity * 2))
|
||||
}
|
||||
|
||||
|
|
Loading…
Add table
Reference in a new issue