mirror of
https://github.com/ACINQ/eclair.git
synced 2025-02-21 22:11:46 +01:00
Improve postman (#2147)
Makes the code a bit cleaner and fixes a bug where `Postman` could respond with both a failure to send and later a `NoReply` after the timeout in case we were expecting a reply.
This commit is contained in:
parent
4307bada51
commit
fa31d81d0e
2 changed files with 32 additions and 35 deletions
|
@ -53,59 +53,54 @@ object Postman {
|
|||
|
||||
val relayMessageStatusAdapter = context.messageAdapter[MessageRelay.Status](SendingStatus)
|
||||
|
||||
// For messages expecting a reply, send reply or failure to send
|
||||
val subscribed = new mutable.HashMap[ByteVector32, ActorRef[OnionMessageResponse]]()
|
||||
|
||||
// For messages not expecting a reply, send success or failure to send
|
||||
val sendStatusTo = new mutable.HashMap[ByteVector32, ActorRef[OnionMessageResponse]]()
|
||||
val sendFailureTo = new mutable.HashMap[ByteVector32, ActorRef[OnionMessageResponse]]()
|
||||
|
||||
Behaviors.receiveMessagePartial {
|
||||
case WrappedMessage(finalPayload, Some(pathId)) if pathId.length == 32 =>
|
||||
subscribed.get(ByteVector32(pathId)) match {
|
||||
case Some(ref) =>
|
||||
subscribed -= ByteVector32(pathId)
|
||||
ref ! Response(finalPayload)
|
||||
case None => () // ignoring message with unknown pathId
|
||||
}
|
||||
val id = ByteVector32(pathId)
|
||||
subscribed.get(id).foreach(ref => {
|
||||
subscribed -= id
|
||||
ref ! Response(finalPayload)
|
||||
})
|
||||
Behaviors.same
|
||||
case WrappedMessage(_, _) =>
|
||||
// ignoring message with invalid or missing pathId
|
||||
Behaviors.same
|
||||
case SendMessage(nextNodeId, message, None, ref, _) =>
|
||||
case SendMessage(nextNodeId, message, None, ref, _) => // not expecting reply
|
||||
val messageId = randomBytes32()
|
||||
sendStatusTo += (messageId -> ref)
|
||||
switchboard ! Switchboard.RelayMessage(messageId, None, nextNodeId, message, MessageRelay.RelayAll, Some(relayMessageStatusAdapter))
|
||||
Behaviors.same
|
||||
case SendMessage(nextNodeId, message, Some(pathId), ref, timeout) =>
|
||||
val messageId = randomBytes32()
|
||||
sendFailureTo += (messageId -> ref)
|
||||
case SendMessage(nextNodeId, message, Some(pathId), ref, timeout) => // expecting reply
|
||||
subscribed += (pathId -> ref)
|
||||
context.scheduleOnce(timeout, context.self, Unsubscribe(pathId))
|
||||
switchboard ! Switchboard.RelayMessage(messageId, None, nextNodeId, message, MessageRelay.RelayAll, Some(relayMessageStatusAdapter))
|
||||
switchboard ! Switchboard.RelayMessage(pathId, None, nextNodeId, message, MessageRelay.RelayAll, Some(relayMessageStatusAdapter))
|
||||
Behaviors.same
|
||||
case Unsubscribe(pathId) =>
|
||||
subscribed.get(pathId).foreach(_ ! NoReply)
|
||||
subscribed -= pathId
|
||||
subscribed.get(pathId).foreach(ref => {
|
||||
subscribed -= pathId
|
||||
ref ! NoReply
|
||||
})
|
||||
Behaviors.same
|
||||
case status@SendingStatus(MessageRelay.Sent(messageId)) =>
|
||||
sendStatusTo.get(messageId) match {
|
||||
case Some(ref) =>
|
||||
sendStatusTo -= messageId
|
||||
ref ! status
|
||||
case None => ()
|
||||
}
|
||||
sendStatusTo.get(messageId).foreach(ref => {
|
||||
sendStatusTo -= messageId
|
||||
ref ! status
|
||||
})
|
||||
Behaviors.same
|
||||
case SendingStatus(status: MessageRelay.Failure) =>
|
||||
sendStatusTo.get(status.messageId) match {
|
||||
case Some(ref) =>
|
||||
sendStatusTo -= status.messageId
|
||||
ref ! SendingStatus(status)
|
||||
case None => ()
|
||||
}
|
||||
sendFailureTo.get(status.messageId) match {
|
||||
case Some(ref) =>
|
||||
sendFailureTo -= status.messageId
|
||||
ref ! SendingStatus(status)
|
||||
case None => ()
|
||||
}
|
||||
sendStatusTo.get(status.messageId).foreach(ref => {
|
||||
sendStatusTo -= status.messageId
|
||||
ref ! SendingStatus(status)
|
||||
})
|
||||
subscribed.get(status.messageId).foreach(ref => {
|
||||
subscribed -= status.messageId
|
||||
ref ! SendingStatus(status)
|
||||
})
|
||||
Behaviors.same
|
||||
}
|
||||
})
|
||||
|
|
|
@ -59,7 +59,7 @@ class PostmanSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("applicat
|
|||
val (_, messageExpectingReply) = OnionMessages.buildMessage(randomKey(), randomKey(), Nil, OnionMessages.Recipient(recipient, None), ReplyPath(replyPath) :: Nil)
|
||||
val payload = FinalPayload(TlvStream(EncryptedData(replyPath.encryptedPayloads.last) :: Nil, GenericTlv(UInt64(42), hex"abcd") :: Nil))
|
||||
|
||||
postman ! SendMessage(recipient, messageExpectingReply, Some(pathId), messageRecipient.ref, 1 second)
|
||||
postman ! SendMessage(recipient, messageExpectingReply, Some(pathId), messageRecipient.ref, 100 millis)
|
||||
|
||||
val RelayMessage(messageId, _, nextNodeId, message, _, _) = switchboard.expectMessageType[RelayMessage]
|
||||
assert(nextNodeId === recipient)
|
||||
|
@ -81,7 +81,7 @@ class PostmanSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("applicat
|
|||
val replyPath = OnionMessages.buildRoute(randomKey(), Nil, OnionMessages.Recipient(ourNodeId, Some(pathId)))
|
||||
val (_, messageExpectingReply) = OnionMessages.buildMessage(randomKey(), randomKey(), Nil, OnionMessages.Recipient(recipient, None), ReplyPath(replyPath) :: Nil)
|
||||
|
||||
postman ! SendMessage(recipient, messageExpectingReply, Some(pathId), messageRecipient.ref, 1 second)
|
||||
postman ! SendMessage(recipient, messageExpectingReply, Some(pathId), messageRecipient.ref, 100 millis)
|
||||
|
||||
val RelayMessage(messageId, _, nextNodeId, message, _, _) = switchboard.expectMessageType[RelayMessage]
|
||||
assert(nextNodeId === recipient)
|
||||
|
@ -89,6 +89,7 @@ class PostmanSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("applicat
|
|||
postman ! SendingStatus(Disconnected(messageId))
|
||||
|
||||
messageRecipient.expectMessage(SendingStatus(Disconnected(messageId)))
|
||||
messageRecipient.expectNoMessage()
|
||||
}
|
||||
|
||||
test("timeout") { f =>
|
||||
|
@ -121,7 +122,7 @@ class PostmanSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("applicat
|
|||
val recipient = randomKey().publicKey
|
||||
val (_, messageExpectingReply) = OnionMessages.buildMessage(randomKey(), randomKey(), Nil, OnionMessages.Recipient(recipient, None), Nil)
|
||||
|
||||
postman ! SendMessage(recipient, messageExpectingReply, None, messageRecipient.ref, 1 second)
|
||||
postman ! SendMessage(recipient, messageExpectingReply, None, messageRecipient.ref, 100 millis)
|
||||
|
||||
val RelayMessage(messageId, _, nextNodeId, message, _, _) = switchboard.expectMessageType[RelayMessage]
|
||||
assert(nextNodeId === recipient)
|
||||
|
@ -129,5 +130,6 @@ class PostmanSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("applicat
|
|||
postman ! SendingStatus(Sent(messageId))
|
||||
|
||||
messageRecipient.expectMessage(SendingStatus(Sent(messageId)))
|
||||
messageRecipient.expectNoMessage()
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Add table
Reference in a new issue