Eclair Web Socket client (#1006)

* Eclair Web Socket client

* fix build error

* unit test
This commit is contained in:
rorp 2020-01-07 16:25:35 -08:00 committed by Chris Stewart
parent 0421076b21
commit c854a96b2a
5 changed files with 159 additions and 35 deletions

View File

@ -3,7 +3,7 @@ package org.bitcoins.eclair.rpc
import java.nio.file.Files
import org.bitcoins.core.currency.{CurrencyUnit, CurrencyUnits, Satoshis}
import org.bitcoins.core.number.{Int64, UInt64}
import org.bitcoins.core.number.UInt64
import org.bitcoins.core.protocol.BitcoinAddress
import org.bitcoins.core.protocol.ln.LnParams.LnBitcoinRegTest
import org.bitcoins.core.protocol.ln.channel.{
@ -552,6 +552,12 @@ class EclairRpcClientTest extends BitcoinSAsyncTest {
_ = assert(channels.exists(_.state == ChannelState.NORMAL),
"Nodes did not have open channel!")
preimage = PaymentPreimage.random
wsEventP = Promise[WebSocketEvent]
_ <- client.connectToWebSocket({ event =>
if (!wsEventP.isCompleted) {
wsEventP.success(event)
}
})
invoice <- otherClient.createInvoice("foo", amt, preimage)
paymentId <- client.sendToNode(otherClientNodeId,
amt,
@ -560,13 +566,19 @@ class EclairRpcClientTest extends BitcoinSAsyncTest {
None,
None,
Some("ext_id"))
_ <- EclairRpcTestUtil.awaitUntilPaymentSucceeded(client, paymentId)
wsEvent <- wsEventP.future
succeeded <- client.getSentInfo(invoice.lnTags.paymentHash.hash)
_ <- client.close(channelId)
bitcoind <- bitcoindRpcClientF
address <- bitcoind.getNewAddress
_ <- bitcoind.generateToAddress(6, address)
} yield {
assert(wsEvent.isInstanceOf[WebSocketEvent.PaymentSent])
val paymentSent = wsEvent.asInstanceOf[WebSocketEvent.PaymentSent]
assert(paymentSent.parts.nonEmpty)
assert(paymentSent.id == paymentId)
assert(paymentSent.parts.head.amount == amt)
assert(paymentSent.parts.head.id == paymentId)
assert(succeeded.nonEmpty)
val succeededPayment = succeeded.head
@ -844,8 +856,9 @@ class EclairRpcClientTest extends BitcoinSAsyncTest {
}
}
def openChannel(c1: EclairRpcClient,
c2: EclairRpcClient): Future[FundedChannelId] = {
def openChannel(
c1: EclairRpcClient,
c2: EclairRpcClient): Future[FundedChannelId] = {
EclairRpcTestUtil
.openChannel(c1, c2, Satoshis(500000), MilliSatoshis(500000))
}
@ -1049,7 +1062,7 @@ class EclairRpcClientTest extends BitcoinSAsyncTest {
ourUpdates.flatMap(our =>
allUpdates.map { all =>
our != all
})
})
}
AsyncUtil
@ -1145,8 +1158,9 @@ class EclairRpcClientTest extends BitcoinSAsyncTest {
}
}
private def hasConnection(client: Future[EclairRpcClient],
nodeId: NodeId): Future[Assertion] = {
private def hasConnection(
client: Future[EclairRpcClient],
nodeId: NodeId): Future[Assertion] = {
val hasPeersF = client.flatMap(_.getPeers.map(_.nonEmpty))
@ -1161,8 +1175,9 @@ class EclairRpcClientTest extends BitcoinSAsyncTest {
}
/** Checks that the given [[org.bitcoins.eclair.rpc.client.EclairRpcClient]] has the given chanId */
private def hasChannel(client: EclairRpcClient,
chanId: ChannelId): Future[Assertion] = {
private def hasChannel(
client: EclairRpcClient,
chanId: ChannelId): Future[Assertion] = {
val recognizedOpenChannel: Future[Assertion] = {
val chanResultF: Future[ChannelResult] = client.channel(chanId)

View File

@ -262,4 +262,7 @@ trait EclairApi {
externalId: Option[String]): Future[PaymentId]
def usableBalances(): Future[Vector[UsableBalancesResult]]
/** Connects to the Eclair web socket end point and passes [[WebSocketEvent]]s to the given [[eventHandler]] */
def connectToWebSocket(eventHandler: WebSocketEvent => Unit): Future[Unit]
}

View File

@ -286,24 +286,41 @@ object WebSocketEvent {
) extends WebSocketEvent
case class PaymentReceived(
amount: MilliSatoshis,
paymentHash: Sha256Digest,
fromChannelId: FundedChannelId,
parts: Vector[PaymentReceived.Part]
) extends WebSocketEvent
object PaymentReceived {
case class Part(
amount: MilliSatoshis,
fromChannelId: FundedChannelId,
timestamp: FiniteDuration // milliseconds
)
}
case class PaymentFailed(
id: PaymentId,
paymentHash: Sha256Digest,
failures: Vector[String],
timestamp: FiniteDuration // milliseconds
) extends WebSocketEvent
case class PaymentFailed(paymentHash: Sha256Digest, failures: Vector[String])
extends WebSocketEvent
case class PaymentSent(
amount: MilliSatoshis,
feesPaid: MilliSatoshis,
id: PaymentId,
paymentHash: Sha256Digest,
paymentPreimage: PaymentPreimage,
toChannelId: FundedChannelId,
timestamp: FiniteDuration //milliseconds
parts: Vector[PaymentSent.Part]
) extends WebSocketEvent
object PaymentSent {
case class Part(
id: PaymentId,
amount: MilliSatoshis,
feesPaid: MilliSatoshis,
toChannelId: FundedChannelId,
timestamp: FiniteDuration // milliseconds
)
}
case class PaymentSettlingOnchain(
amount: MilliSatoshis,
paymentHash: Sha256Digest,

View File

@ -4,11 +4,15 @@ import java.io.File
import java.nio.file.NoSuchFileException
import java.util.concurrent.atomic.AtomicInteger
import akka.Done
import akka.actor.ActorSystem
import akka.http.javadsl.model.headers.HttpCredentials
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import akka.http.scaladsl.model.headers.{Authorization, BasicHttpCredentials}
import akka.http.scaladsl.model.ws.{Message, TextMessage, WebSocketRequest}
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Flow, Sink, Source}
import akka.util.ByteString
import org.bitcoins.core.crypto.Sha256Digest
import org.bitcoins.core.currency.{CurrencyUnit, Satoshis}
@ -837,6 +841,48 @@ class EclairRpcClient(val instance: EclairInstance, binary: Option[File] = None)
f
}
/** @inheritdoc */
override def connectToWebSocket(
eventHandler: WebSocketEvent => Unit): Future[Unit] = {
val incoming: Sink[Message, Future[Done]] =
Sink.foreach[Message] {
case message: TextMessage.Strict =>
val parsed: JsValue = Json.parse(message.text)
val validated: JsResult[WebSocketEvent] =
parsed.validate[WebSocketEvent]
val event = parseResult[WebSocketEvent](validated, parsed, "ws")
eventHandler(event)
case _: Message => ()
}
val flow =
Flow.fromSinkAndSource(incoming, Source.maybe)
val uri =
instance.rpcUri.resolve("/ws").toString.replace("http://", "ws://")
instance.authCredentials.bitcoinAuthOpt
val request = WebSocketRequest(
uri,
extraHeaders = Vector(
Authorization(
BasicHttpCredentials("", instance.authCredentials.password))))
val (upgradeResponse, _) = Http().singleWebSocketRequest(request, flow)
val connected = upgradeResponse.map { upgrade =>
if (upgrade.response.status == StatusCodes.SwitchingProtocols) {
Done
} else {
throw new RuntimeException(
s"Connection failed: ${upgrade.response.status}")
}
}
connected.failed.foreach(ex =>
logger.error(s"Cannot connect to web socket $uri ", ex))
connected.map(_ => ())
}
}
object EclairRpcClient {

View File

@ -465,39 +465,66 @@ object JsonReaders {
timestamp)
}
implicit val paymentReceivedEventReads: Reads[
WebSocketEvent.PaymentReceived] = Reads { js =>
implicit val paymentReceivedEventPartReads: Reads[
WebSocketEvent.PaymentReceived.Part] = Reads { js =>
for {
amount <- (js \ "amount").validate[MilliSatoshis]
paymentHash <- (js \ "paymentHash").validate[Sha256Digest]
fromChannelId <- (js \ "fromChannelId").validate[FundedChannelId]
timestamp <- (js \ "timestamp")
.validate[FiniteDuration](finiteDurationReadsMilliseconds)
} yield WebSocketEvent.PaymentReceived(amount,
paymentHash,
fromChannelId,
timestamp)
} yield WebSocketEvent.PaymentReceived.Part(amount,
fromChannelId,
timestamp)
}
implicit val paymentReceivedEventReads: Reads[
WebSocketEvent.PaymentReceived] = Reads { js =>
for {
paymentHash <- (js \ "paymentHash").validate[Sha256Digest]
parts <- (js \ "parts")
.validate[Vector[WebSocketEvent.PaymentReceived.Part]]
} yield WebSocketEvent.PaymentReceived(paymentHash, parts)
}
implicit val paymentFailedEventReads: Reads[WebSocketEvent.PaymentFailed] =
Json.reads[WebSocketEvent.PaymentFailed]
Reads { js =>
for {
id <- (js \ "id").validate[PaymentId]
paymentHash <- (js \ "paymentHash").validate[Sha256Digest]
failures <- (js \ "failures").validate[Vector[String]]
timestamp <- (js \ "timestamp")
.validate[FiniteDuration](finiteDurationReadsMilliseconds)
} yield WebSocketEvent.PaymentFailed(id, paymentHash, failures, timestamp)
}
implicit val paymentSentEventPartReads: Reads[
WebSocketEvent.PaymentSent.Part] = Reads { js =>
for {
id <- (js \ "id").validate[PaymentId]
amount <- (js \ "amount").validate[MilliSatoshis]
feesPaid <- (js \ "feesPaid").validate[MilliSatoshis]
toChannelId <- (js \ "toChannelId").validate[FundedChannelId]
timestamp <- (js \ "timestamp")
.validate[FiniteDuration](finiteDurationReadsMilliseconds)
} yield WebSocketEvent.PaymentSent.Part(id,
amount,
feesPaid,
toChannelId,
timestamp)
}
implicit val paymentSentEventReads: Reads[WebSocketEvent.PaymentSent] =
Reads { js =>
for {
amount <- (js \ "amount").validate[MilliSatoshis]
feesPaid <- (js \ "feesPaid").validate[MilliSatoshis]
id <- (js \ "id").validate[PaymentId]
paymentHash <- (js \ "paymentHash").validate[Sha256Digest]
paymentPreimage <- (js \ "paymentPreimage").validate[PaymentPreimage]
toChannelId <- (js \ "toChannelId").validate[FundedChannelId]
timestamp <- (js \ "timestamp")
.validate[FiniteDuration](finiteDurationReadsMilliseconds)
} yield WebSocketEvent.PaymentSent(amount,
feesPaid,
parts <- (js \ "parts")
.validate[Vector[WebSocketEvent.PaymentSent.Part]]
} yield WebSocketEvent.PaymentSent(id,
paymentHash,
paymentPreimage,
toChannelId,
timestamp)
parts)
}
implicit val paymentSettlingOnchainEventReads: Reads[
@ -512,4 +539,20 @@ object JsonReaders {
timestamp)
}
implicit val webSocketEventReads: Reads[WebSocketEvent] =
Reads { js =>
(js \ "type")
.validate[String]
.flatMap {
case "payment-relayed" => js.validate[WebSocketEvent.PaymentRelayed]
case "payment-received" => js.validate[WebSocketEvent.PaymentReceived]
case "payment-failed" =>
js.validate[WebSocketEvent.PaymentFailed]
case "payment-sent" =>
js.validate[WebSocketEvent.PaymentSent]
case "payment-settling-onchain" =>
js.validate[WebSocketEvent.PaymentSettlingOnchain]
}
}
}