From b6b443193568358e4d6bf41a24415660b946314f Mon Sep 17 00:00:00 2001 From: rorp Date: Mon, 15 Jul 2019 09:18:48 -0700 Subject: [PATCH] Revert payment monitor (#600) * Revert payment monitor * Revert payment monitor * expose EcecutionContext in EclairApi * more configurable payment monitoring * cleanup, more tests * addressed comments --- .../eclair/rpc/EclairRpcClientTest.scala | 21 +- .../bitcoins/eclair/rpc/api/EclairApi.scala | 29 +- .../eclair/rpc/client/EclairRpcClient.scala | 267 ++++++++++++++---- 3 files changed, 253 insertions(+), 64 deletions(-) diff --git a/eclair-rpc-test/src/test/scala/org/bitcoins/eclair/rpc/EclairRpcClientTest.scala b/eclair-rpc-test/src/test/scala/org/bitcoins/eclair/rpc/EclairRpcClientTest.scala index e6c28454bc..c719441581 100644 --- a/eclair-rpc-test/src/test/scala/org/bitcoins/eclair/rpc/EclairRpcClientTest.scala +++ b/eclair-rpc-test/src/test/scala/org/bitcoins/eclair/rpc/EclairRpcClientTest.scala @@ -170,6 +170,23 @@ class EclairRpcClientTest extends AsyncFlatSpec with BeforeAndAfterAll { AsyncUtil.awaitConditionF(hasRoute, duration = 10.seconds, maxTries = 20).map(_ => succeed) } + it should "pay an invoice and monitor the payment" in { + val checkPayment = { + (client: EclairRpcClient, otherClient: EclairRpcClient) => + + for { + _ <- openAndConfirmChannel(clientF, otherClientF) + invoice <- otherClient.createInvoice("abc", 50.msats) + paymentResult <- client.payAndMonitorInvoice(invoice, 1.second, 10) + } yield { + assert(paymentResult.amountMsat == 50.msats) + } + } + + executeWithClientOtherClient(checkPayment) + } + + it should "check a payment" in { val checkPayment = { (client: EclairRpcClient, otherClient: EclairRpcClient) => @@ -532,8 +549,8 @@ class EclairRpcClientTest extends AsyncFlatSpec with BeforeAndAfterAll { { for { channelId <- openAndConfirmChannel(clientF, otherClientF) - invoice <- otherClient.createInvoice("no amount", None, None, None, None) - paymentId <- client.payInvoice(invoice, Some(amt), None, None, None) + invoice <- otherClient.createInvoice("no amount") + paymentId <- client.payInvoice(invoice, amt) _ <- EclairRpcTestUtil.awaitUntilPaymentSucceeded(client, paymentId) succeeded <- client.getSentInfo(invoice.lnTags.paymentHash.hash) _ <- client.close(channelId) diff --git a/eclair-rpc/src/main/scala/org/bitcoins/eclair/rpc/api/EclairApi.scala b/eclair-rpc/src/main/scala/org/bitcoins/eclair/rpc/api/EclairApi.scala index cc9ab82791..13713faf23 100644 --- a/eclair-rpc/src/main/scala/org/bitcoins/eclair/rpc/api/EclairApi.scala +++ b/eclair-rpc/src/main/scala/org/bitcoins/eclair/rpc/api/EclairApi.scala @@ -12,7 +12,7 @@ import org.bitcoins.core.wallet.fee.SatoshisPerByte import org.bitcoins.eclair.rpc.json._ import org.bitcoins.eclair.rpc.network.NodeUri -import scala.concurrent.duration.FiniteDuration +import scala.concurrent.duration._ import scala.concurrent.{ExecutionContext, Future} /** @@ -22,6 +22,8 @@ import scala.concurrent.{ExecutionContext, Future} */ trait EclairApi { + implicit def executionContext: ExecutionContext + def allChannels(): Future[Vector[ChannelDesc]] def allNodes(): Future[Vector[NodeInfo]] @@ -108,7 +110,7 @@ trait EclairApi { def networkFees(from: Option[FiniteDuration], to: Option[FiniteDuration]): Future[Vector[NetworkFeesResult]] - def nodeId()(implicit ec: ExecutionContext): Future[NodeId] = { + def nodeId(): Future[NodeId] = { getNodeURI.map(_.nodeId) } @@ -136,6 +138,29 @@ trait EclairApi { def payInvoice(invoice: LnInvoice, amountMsat: Option[MilliSatoshis], maxAttempts: Option[Int], feeThresholdSat: Option[Satoshis], maxFeePct: Option[Int]): Future[PaymentId] + /** + * Pings eclair to see if a invoice has been paid and returns [[org.bitcoins.eclair.rpc.json.PaymentResult PaymentResult]] + * + * @param paymentId the payment id returnned by [[org.bitcoins.eclair.rpc.api.EclairApi.payInvoice()]] + * @param interval the ping interval + * @param maxAttempts the maximum number of pings + * + */ + def monitorSentPayment(paymentId: PaymentId, interval: FiniteDuration, maxAttempts: Int): Future[PaymentResult] + + def payAndMonitorInvoice(invoice: LnInvoice, interval: FiniteDuration, maxAttempts: Int): Future[PaymentResult] = + for { + paymentId <- payInvoice(invoice) + paymentResult <- monitorSentPayment(paymentId, interval, maxAttempts) + } yield paymentResult + + + def payAndMonitorInvoice(invoice: LnInvoice, amount: MilliSatoshis, interval: FiniteDuration, maxAttempts: Int): Future[PaymentResult] = + for { + paymentId <- payInvoice(invoice, amount) + paymentResult <- monitorSentPayment(paymentId, interval, maxAttempts) + } yield paymentResult + def getSentInfo(paymentHash: Sha256Digest): Future[Vector[PaymentResult]] def getSentInfo(id: PaymentId): Future[Vector[PaymentResult]] diff --git a/eclair-rpc/src/main/scala/org/bitcoins/eclair/rpc/client/EclairRpcClient.scala b/eclair-rpc/src/main/scala/org/bitcoins/eclair/rpc/client/EclairRpcClient.scala index c220e876e0..045e80d44c 100644 --- a/eclair-rpc/src/main/scala/org/bitcoins/eclair/rpc/client/EclairRpcClient.scala +++ b/eclair-rpc/src/main/scala/org/bitcoins/eclair/rpc/client/EclairRpcClient.scala @@ -1,6 +1,7 @@ package org.bitcoins.eclair.rpc.client import java.io.File +import java.util.concurrent.atomic.AtomicInteger import akka.actor.ActorSystem import akka.http.javadsl.model.headers.HttpCredentials @@ -38,11 +39,12 @@ class EclairRpcClient(val instance: EclairInstance)( import JsonReaders._ implicit val m = ActorMaterializer.create(system) - implicit val ec: ExecutionContext = m.executionContext private val logger = LoggerFactory.getLogger(this.getClass) def getDaemon: EclairInstance = instance + override implicit def executionContext: ExecutionContext = m.executionContext + override def allChannels(): Future[Vector[ChannelDesc]] = { eclairCall[Vector[ChannelDesc]]("allchannels") } @@ -66,10 +68,13 @@ class EclairRpcClient(val instance: EclairInstance)( /** * @inheritdoc */ - override def audit(from: Option[FiniteDuration], to: Option[FiniteDuration]): Future[AuditResult] = - eclairCall[AuditResult]("audit", Seq( - from.map(x => "from" -> x.toSeconds.toString), - to.map(x => "to" -> x.toSeconds.toString)).flatten: _*) + override def audit( + from: Option[FiniteDuration], + to: Option[FiniteDuration]): Future[AuditResult] = + eclairCall[AuditResult]( + "audit", + Seq(from.map(x => "from" -> x.toSeconds.toString), + to.map(x => "to" -> x.toSeconds.toString)).flatten: _*) override def channel(channelId: ChannelId): Future[ChannelResult] = { eclairCall[ChannelResult]("channel", "channelId" -> channelId.hex) @@ -110,7 +115,10 @@ class EclairRpcClient(val instance: EclairInstance)( close(channelId, Some(scriptPubKey)) } - override def connect(nodeId: NodeId, host: String, port: Int): Future[Unit] = { + override def connect( + nodeId: NodeId, + host: String, + port: Int): Future[Unit] = { val uri = NodeUri(nodeId, host, port) connect(uri) } @@ -119,20 +127,30 @@ class EclairRpcClient(val instance: EclairInstance)( eclairCall[String]("connect", "uri" -> uri.toString).map(_ => ()) } - override def findRoute(nodeId: NodeId, amountMsat: MilliSatoshis): Future[Vector[NodeId]] = { - eclairCall[Vector[NodeId]]("findroutetonode", "nodeId" -> nodeId.toString, "amountMsat" -> amountMsat.toBigDecimal.toString) + override def findRoute( + nodeId: NodeId, + amountMsat: MilliSatoshis): Future[Vector[NodeId]] = { + eclairCall[Vector[NodeId]]("findroutetonode", + "nodeId" -> nodeId.toString, + "amountMsat" -> amountMsat.toBigDecimal.toString) } override def findRoute(invoice: LnInvoice): Future[Vector[NodeId]] = { findRoute(invoice, None) } - override def findRoute(invoice: LnInvoice, amount: MilliSatoshis): Future[Vector[NodeId]] = { + override def findRoute( + invoice: LnInvoice, + amount: MilliSatoshis): Future[Vector[NodeId]] = { findRoute(invoice, Some(amount)) } - def findRoute(invoice: LnInvoice, amountMsat: Option[MilliSatoshis]): Future[Vector[NodeId]] = { - val params = Seq(Some("invoice" -> invoice.toString), amountMsat.map(x => "amountMsat" -> x.toBigDecimal.toString)).flatten + def findRoute( + invoice: LnInvoice, + amountMsat: Option[MilliSatoshis]): Future[Vector[NodeId]] = { + val params = Seq( + Some("invoice" -> invoice.toString), + amountMsat.map(x => "amountMsat" -> x.toBigDecimal.toString)).flatten eclairCall[Vector[NodeId]]("findroute", params: _*) } @@ -141,7 +159,8 @@ class EclairRpcClient(val instance: EclairInstance)( } override def forceClose(shortChannelId: ShortChannelId): Future[Unit] = { - eclairCall[String]("forceclose", "shortChannelId" -> shortChannelId.toString).map(_ => ()) + eclairCall[String]("forceclose", + "shortChannelId" -> shortChannelId.toString).map(_ => ()) } override def getInfo: Future[GetInfoResult] = { @@ -191,11 +210,13 @@ class EclairRpcClient(val instance: EclairInstance)( "pushMsat" -> _pushMsat, "fundingFeerateSatByte" -> feerateSatPerByte.get.toLong.toString) } else { - Seq("nodeId" -> nodeId.toString, + Seq( + "nodeId" -> nodeId.toString, "fundingSatoshis" -> _fundingSatoshis, "pushMsat" -> _pushMsat, "fundingFeerateSatByte" -> feerateSatPerByte.get.toLong.toString, - "channelFlags" -> channelFlags.get.toString) + "channelFlags" -> channelFlags.get.toString + ) } } @@ -275,23 +296,48 @@ class EclairRpcClient(val instance: EclairInstance)( createInvoice(description, None, None, None, None) } - override def createInvoice(description: String, amountMsat: MilliSatoshis): Future[LnInvoice] = { + override def createInvoice( + description: String, + amountMsat: MilliSatoshis): Future[LnInvoice] = { createInvoice(description, Some(amountMsat), None, None, None) } - override def createInvoice(description: String, amountMsat: MilliSatoshis, expireIn: FiniteDuration): Future[LnInvoice] = { + override def createInvoice( + description: String, + amountMsat: MilliSatoshis, + expireIn: FiniteDuration): Future[LnInvoice] = { createInvoice(description, Some(amountMsat), Some(expireIn), None, None) } - override def createInvoice(description: String, amountMsat: MilliSatoshis, paymentPreimage: PaymentPreimage): Future[LnInvoice] = { - createInvoice(description, Some(amountMsat), None, None, Some(paymentPreimage)) + override def createInvoice( + description: String, + amountMsat: MilliSatoshis, + paymentPreimage: PaymentPreimage): Future[LnInvoice] = { + createInvoice(description, + Some(amountMsat), + None, + None, + Some(paymentPreimage)) } - override def createInvoice(description: String, amountMsat: MilliSatoshis, expireIn: FiniteDuration, paymentPreimage: PaymentPreimage): Future[LnInvoice] = { - createInvoice(description, Some(amountMsat), Some(expireIn), None, Some(paymentPreimage)) + override def createInvoice( + description: String, + amountMsat: MilliSatoshis, + expireIn: FiniteDuration, + paymentPreimage: PaymentPreimage): Future[LnInvoice] = { + createInvoice(description, + Some(amountMsat), + Some(expireIn), + None, + Some(paymentPreimage)) } - override def createInvoice(description: String, amountMsat: Option[MilliSatoshis], expireIn: Option[FiniteDuration], fallbackAddress: Option[Address], paymentPreimage: Option[PaymentPreimage]): Future[LnInvoice] = { + override def createInvoice( + description: String, + amountMsat: Option[MilliSatoshis], + expireIn: Option[FiniteDuration], + fallbackAddress: Option[Address], + paymentPreimage: Option[PaymentPreimage]): Future[LnInvoice] = { val params = Seq( Some("description" -> description), amountMsat.map(x => "amountMsat" -> x.toBigDecimal.toString), @@ -302,9 +348,8 @@ class EclairRpcClient(val instance: EclairInstance)( val responseF = eclairCall[InvoiceResult]("createinvoice", params: _*) - responseF.flatMap { - res => - Future.fromTry(LnInvoice.fromString(res.serialized)) + responseF.flatMap { res => + Future.fromTry(LnInvoice.fromString(res.serialized)) } } @@ -316,11 +361,18 @@ class EclairRpcClient(val instance: EclairInstance)( payInvoice(invoice, None, None, None, None) } - override def payInvoice(invoice: LnInvoice, amount: MilliSatoshis): Future[PaymentId] = { + override def payInvoice( + invoice: LnInvoice, + amount: MilliSatoshis): Future[PaymentId] = { payInvoice(invoice, Some(amount), None, None, None) } - override def payInvoice(invoice: LnInvoice, amountMsat: Option[MilliSatoshis], maxAttempts: Option[Int], feeThresholdSat: Option[Satoshis], maxFeePct: Option[Int]): Future[PaymentId] = { + override def payInvoice( + invoice: LnInvoice, + amountMsat: Option[MilliSatoshis], + maxAttempts: Option[Int], + feeThresholdSat: Option[Satoshis], + maxFeePct: Option[Int]): Future[PaymentId] = { val params = Seq( Some("invoice" -> invoice.toString), amountMsat.map(x => "amountMsat" -> x.toBigDecimal.toString), @@ -332,27 +384,38 @@ class EclairRpcClient(val instance: EclairInstance)( eclairCall[PaymentId]("payinvoice", params: _*) } - override def getReceivedInfo(paymentHash: Sha256Digest): Future[ReceivedPaymentResult] = { - eclairCall[ReceivedPaymentResult]("getreceivedinfo", "paymentHash" -> paymentHash.hex) + override def getReceivedInfo( + paymentHash: Sha256Digest): Future[ReceivedPaymentResult] = { + eclairCall[ReceivedPaymentResult]("getreceivedinfo", + "paymentHash" -> paymentHash.hex) } - override def getReceivedInfo(invoice: LnInvoice): Future[ReceivedPaymentResult] = { - eclairCall[ReceivedPaymentResult]("getreceivedinfo", "invoice" -> invoice.toString) + override def getReceivedInfo( + invoice: LnInvoice): Future[ReceivedPaymentResult] = { + eclairCall[ReceivedPaymentResult]("getreceivedinfo", + "invoice" -> invoice.toString) } - override def getSentInfo(paymentHash: Sha256Digest): Future[Vector[PaymentResult]] = { - eclairCall[Vector[PaymentResult]]("getsentinfo", "paymentHash" -> paymentHash.hex) + override def getSentInfo( + paymentHash: Sha256Digest): Future[Vector[PaymentResult]] = { + eclairCall[Vector[PaymentResult]]("getsentinfo", + "paymentHash" -> paymentHash.hex) } override def getSentInfo(id: PaymentId): Future[Vector[PaymentResult]] = { eclairCall[Vector[PaymentResult]]("getsentinfo", "id" -> id.toString) } - override def sendToNode(nodeId: NodeId, amountMsat: MilliSatoshis, paymentHash: Sha256Digest, maxAttempts: Option[Int], feeThresholdSat: Option[Satoshis], maxFeePct: Option[Int]): Future[PaymentId] = { - val params = Seq( - "nodeId" -> nodeId.toString, - "amountMsat" -> amountMsat.toBigDecimal.toString, - "paymentHash" -> paymentHash.hex) ++ Seq( + override def sendToNode( + nodeId: NodeId, + amountMsat: MilliSatoshis, + paymentHash: Sha256Digest, + maxAttempts: Option[Int], + feeThresholdSat: Option[Satoshis], + maxFeePct: Option[Int]): Future[PaymentId] = { + val params = Seq("nodeId" -> nodeId.toString, + "amountMsat" -> amountMsat.toBigDecimal.toString, + "paymentHash" -> paymentHash.hex) ++ Seq( maxAttempts.map(x => "maxAttempts" -> x.toString), feeThresholdSat.map(x => "feeThresholdSat" -> x.toBigDecimal.toString), maxFeePct.map(x => "maxFeePct" -> x.toString) @@ -361,57 +424,75 @@ class EclairRpcClient(val instance: EclairInstance)( eclairCall[PaymentId]("sendtonode", params: _*) } - def sendToRoute(route: TraversableOnce[NodeId], amountMsat: MilliSatoshis, paymentHash: Sha256Digest, finalCltvExpiry: Long): Future[PaymentId] = { - eclairCall[PaymentId]("sendtoroute", + def sendToRoute( + route: TraversableOnce[NodeId], + amountMsat: MilliSatoshis, + paymentHash: Sha256Digest, + finalCltvExpiry: Long): Future[PaymentId] = { + eclairCall[PaymentId]( + "sendtoroute", "route" -> route.mkString(","), "amountMsat" -> amountMsat.toBigDecimal.toString, "paymentHash" -> paymentHash.hex, - "finalCltvExpiry" -> finalCltvExpiry.toString) + "finalCltvExpiry" -> finalCltvExpiry.toString + ) } override def updateRelayFee( channelId: ChannelId, feeBaseMsat: MilliSatoshis, feeProportionalMillionths: Long): Future[Unit] = { - eclairCall[Unit]("updaterelayfee", + eclairCall[Unit]( + "updaterelayfee", "channelId" -> channelId.hex, "feeBaseMsat" -> feeBaseMsat.toLong.toString, - "feeProportionalMillionths" -> feeProportionalMillionths.toString) + "feeProportionalMillionths" -> feeProportionalMillionths.toString + ) } override def updateRelayFee( shortChannelId: ShortChannelId, feeBaseMsat: MilliSatoshis, feeProportionalMillionths: Long): Future[Unit] = { - eclairCall[Unit]("updaterelayfee", + eclairCall[Unit]( + "updaterelayfee", "shortChannelId" -> shortChannelId.toHumanReadableString, "feeBaseMsat" -> feeBaseMsat.toLong.toString, - "feeProportionalMillionths" -> feeProportionalMillionths.toString) + "feeProportionalMillionths" -> feeProportionalMillionths.toString + ) } override def channelStats(): Future[Vector[ChannelStats]] = { eclairCall[Vector[ChannelStats]]("channelstats") } - override def networkFees(from: Option[FiniteDuration], to: Option[FiniteDuration]): Future[Vector[NetworkFeesResult]] = { - eclairCall[Vector[NetworkFeesResult]]("networkfees", Seq( - from.map(x => "from" -> x.toSeconds.toString), - to.map(x => "to" -> x.toSeconds.toString)).flatten: _*) + override def networkFees( + from: Option[FiniteDuration], + to: Option[FiniteDuration]): Future[Vector[NetworkFeesResult]] = { + eclairCall[Vector[NetworkFeesResult]]( + "networkfees", + Seq(from.map(x => "from" -> x.toSeconds.toString), + to.map(x => "to" -> x.toSeconds.toString)).flatten: _*) } override def getInvoice(paymentHash: Sha256Digest): Future[LnInvoice] = { - val resF = eclairCall[InvoiceResult]("getinvoice", "paymentHash" -> paymentHash.hex) - resF.flatMap { - res => - Future.fromTry(LnInvoice.fromString(res.serialized)) + val resF = + eclairCall[InvoiceResult]("getinvoice", "paymentHash" -> paymentHash.hex) + resF.flatMap { res => + Future.fromTry(LnInvoice.fromString(res.serialized)) } } - override def listInvoices(from: Option[FiniteDuration], to: Option[FiniteDuration]): Future[Vector[LnInvoice]] = { - val resF = eclairCall[Vector[InvoiceResult]]("listinvoices", Seq( - from.map(x => "from" -> x.toSeconds.toString), - to.map(x => "to" -> x.toSeconds.toString)).flatten: _*) - resF.flatMap(xs => Future.sequence(xs.map(x => Future.fromTry(LnInvoice.fromString(x.serialized))))) + override def listInvoices( + from: Option[FiniteDuration], + to: Option[FiniteDuration]): Future[Vector[LnInvoice]] = { + val resF = eclairCall[Vector[InvoiceResult]]( + "listinvoices", + Seq(from.map(x => "from" -> x.toSeconds.toString), + to.map(x => "to" -> x.toSeconds.toString)).flatten: _*) + resF.flatMap(xs => + Future.sequence(xs.map(x => + Future.fromTry(LnInvoice.fromString(x.serialized))))) } override def usableBalances(): Future[Vector[UsableBalancesResult]] = { @@ -432,9 +513,9 @@ class EclairRpcClient(val instance: EclairInstance)( val payloadF: Future[JsValue] = responseF.flatMap(getPayload) payloadF.map { payload => - val validated: JsResult[T] = payload.validate[T] - val parsed: T = parseResult(validated, payload, command) - parsed + val validated: JsResult[T] = payload.validate[T] + val parsed: T = parseResult(validated, payload, command) + parsed } } @@ -568,4 +649,70 @@ class EclairRpcClient(val instance: EclairInstance)( def stop(): Option[Unit] = { process.map(_.destroy()) } + + /** + * Pings eclair to see if a invoice has been paid + * If the invoice has been paid or the payment has failed, we publish a + * [[org.bitcoins.eclair.rpc.json.PaymentResult PaymentResult]] + * event to the [[akka.actor.ActorSystem ActorSystem]]'s + * [[akka.event.EventStream ActorSystem.eventStream]] + * + * We also return a Future[PaymentResult] that is completed when one of three things is true + * 1. The payment has succeeded + * 2. The payment has failed + * 3. We have attempted to query the eclair more than maxAttempts, and the payment is still pending + */ + override def monitorSentPayment( + paymentId: PaymentId, + interval: FiniteDuration, + maxAttempts: Int): Future[PaymentResult] = { + val p: Promise[PaymentResult] = Promise[PaymentResult]() + + val runnable = new Runnable() { + + private val attempts = new AtomicInteger(0) + + override def run(): Unit = { + if (attempts.incrementAndGet() > maxAttempts) { + // too many tries to get info about a payment + // either Eclair is down or the payment is still in PENDING state for some reason + // complete the promise with an exception so the runnable will be canceled + p.failure( + new RuntimeException( + s"EclairApi.monitorSentPayment() too many attempts: ${attempts + .get()} for paymentId=${paymentId} for interval=${interval}")) + } else { + val resultsF = getSentInfo(paymentId) + resultsF.recover { + case e: Throwable => logger.error(s"Cannot check payment status for paymentId=${paymentId}", e) + } + val _ = for { + results <- resultsF + } yield { + results.foreach { result => + result.status match { + case PaymentStatus.PENDING => + //do nothing, while we wait for eclair to attempt to process + case PaymentStatus.SUCCEEDED | PaymentStatus.FAILED => + // invoice has been succeeded or has failed, let's publish to event stream + // so subscribers to the event stream can see that a payment + // was received or failed + // complete the promise so the runnable will be canceled + p.success(result) + } + } + } + } + } + } + + val cancellable = system.scheduler.schedule(interval, interval, runnable) + + val f = p.future + + f.onComplete(_ => cancellable.cancel()) + f.foreach(system.eventStream.publish) + + f + } }