Revert payment monitor (#600)

* Revert payment monitor

* Revert payment monitor

* expose EcecutionContext in EclairApi

* more configurable payment monitoring

* cleanup, more tests

* addressed comments
This commit is contained in:
rorp 2019-07-15 09:18:48 -07:00 committed by Chris Stewart
parent e8c334da87
commit b6b4431935
3 changed files with 253 additions and 64 deletions

View File

@ -170,6 +170,23 @@ class EclairRpcClientTest extends AsyncFlatSpec with BeforeAndAfterAll {
AsyncUtil.awaitConditionF(hasRoute, duration = 10.seconds, maxTries = 20).map(_ => succeed)
}
it should "pay an invoice and monitor the payment" in {
val checkPayment = {
(client: EclairRpcClient, otherClient: EclairRpcClient) =>
for {
_ <- openAndConfirmChannel(clientF, otherClientF)
invoice <- otherClient.createInvoice("abc", 50.msats)
paymentResult <- client.payAndMonitorInvoice(invoice, 1.second, 10)
} yield {
assert(paymentResult.amountMsat == 50.msats)
}
}
executeWithClientOtherClient(checkPayment)
}
it should "check a payment" in {
val checkPayment = {
(client: EclairRpcClient, otherClient: EclairRpcClient) =>
@ -532,8 +549,8 @@ class EclairRpcClientTest extends AsyncFlatSpec with BeforeAndAfterAll {
{
for {
channelId <- openAndConfirmChannel(clientF, otherClientF)
invoice <- otherClient.createInvoice("no amount", None, None, None, None)
paymentId <- client.payInvoice(invoice, Some(amt), None, None, None)
invoice <- otherClient.createInvoice("no amount")
paymentId <- client.payInvoice(invoice, amt)
_ <- EclairRpcTestUtil.awaitUntilPaymentSucceeded(client, paymentId)
succeeded <- client.getSentInfo(invoice.lnTags.paymentHash.hash)
_ <- client.close(channelId)

View File

@ -12,7 +12,7 @@ import org.bitcoins.core.wallet.fee.SatoshisPerByte
import org.bitcoins.eclair.rpc.json._
import org.bitcoins.eclair.rpc.network.NodeUri
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, Future}
/**
@ -22,6 +22,8 @@ import scala.concurrent.{ExecutionContext, Future}
*/
trait EclairApi {
implicit def executionContext: ExecutionContext
def allChannels(): Future[Vector[ChannelDesc]]
def allNodes(): Future[Vector[NodeInfo]]
@ -108,7 +110,7 @@ trait EclairApi {
def networkFees(from: Option[FiniteDuration], to: Option[FiniteDuration]): Future[Vector[NetworkFeesResult]]
def nodeId()(implicit ec: ExecutionContext): Future[NodeId] = {
def nodeId(): Future[NodeId] = {
getNodeURI.map(_.nodeId)
}
@ -136,6 +138,29 @@ trait EclairApi {
def payInvoice(invoice: LnInvoice, amountMsat: Option[MilliSatoshis], maxAttempts: Option[Int], feeThresholdSat: Option[Satoshis], maxFeePct: Option[Int]): Future[PaymentId]
/**
* Pings eclair to see if a invoice has been paid and returns [[org.bitcoins.eclair.rpc.json.PaymentResult PaymentResult]]
*
* @param paymentId the payment id returnned by [[org.bitcoins.eclair.rpc.api.EclairApi.payInvoice()]]
* @param interval the ping interval
* @param maxAttempts the maximum number of pings
*
*/
def monitorSentPayment(paymentId: PaymentId, interval: FiniteDuration, maxAttempts: Int): Future[PaymentResult]
def payAndMonitorInvoice(invoice: LnInvoice, interval: FiniteDuration, maxAttempts: Int): Future[PaymentResult] =
for {
paymentId <- payInvoice(invoice)
paymentResult <- monitorSentPayment(paymentId, interval, maxAttempts)
} yield paymentResult
def payAndMonitorInvoice(invoice: LnInvoice, amount: MilliSatoshis, interval: FiniteDuration, maxAttempts: Int): Future[PaymentResult] =
for {
paymentId <- payInvoice(invoice, amount)
paymentResult <- monitorSentPayment(paymentId, interval, maxAttempts)
} yield paymentResult
def getSentInfo(paymentHash: Sha256Digest): Future[Vector[PaymentResult]]
def getSentInfo(id: PaymentId): Future[Vector[PaymentResult]]

View File

@ -1,6 +1,7 @@
package org.bitcoins.eclair.rpc.client
import java.io.File
import java.util.concurrent.atomic.AtomicInteger
import akka.actor.ActorSystem
import akka.http.javadsl.model.headers.HttpCredentials
@ -38,11 +39,12 @@ class EclairRpcClient(val instance: EclairInstance)(
import JsonReaders._
implicit val m = ActorMaterializer.create(system)
implicit val ec: ExecutionContext = m.executionContext
private val logger = LoggerFactory.getLogger(this.getClass)
def getDaemon: EclairInstance = instance
override implicit def executionContext: ExecutionContext = m.executionContext
override def allChannels(): Future[Vector[ChannelDesc]] = {
eclairCall[Vector[ChannelDesc]]("allchannels")
}
@ -66,10 +68,13 @@ class EclairRpcClient(val instance: EclairInstance)(
/**
* @inheritdoc
*/
override def audit(from: Option[FiniteDuration], to: Option[FiniteDuration]): Future[AuditResult] =
eclairCall[AuditResult]("audit", Seq(
from.map(x => "from" -> x.toSeconds.toString),
to.map(x => "to" -> x.toSeconds.toString)).flatten: _*)
override def audit(
from: Option[FiniteDuration],
to: Option[FiniteDuration]): Future[AuditResult] =
eclairCall[AuditResult](
"audit",
Seq(from.map(x => "from" -> x.toSeconds.toString),
to.map(x => "to" -> x.toSeconds.toString)).flatten: _*)
override def channel(channelId: ChannelId): Future[ChannelResult] = {
eclairCall[ChannelResult]("channel", "channelId" -> channelId.hex)
@ -110,7 +115,10 @@ class EclairRpcClient(val instance: EclairInstance)(
close(channelId, Some(scriptPubKey))
}
override def connect(nodeId: NodeId, host: String, port: Int): Future[Unit] = {
override def connect(
nodeId: NodeId,
host: String,
port: Int): Future[Unit] = {
val uri = NodeUri(nodeId, host, port)
connect(uri)
}
@ -119,20 +127,30 @@ class EclairRpcClient(val instance: EclairInstance)(
eclairCall[String]("connect", "uri" -> uri.toString).map(_ => ())
}
override def findRoute(nodeId: NodeId, amountMsat: MilliSatoshis): Future[Vector[NodeId]] = {
eclairCall[Vector[NodeId]]("findroutetonode", "nodeId" -> nodeId.toString, "amountMsat" -> amountMsat.toBigDecimal.toString)
override def findRoute(
nodeId: NodeId,
amountMsat: MilliSatoshis): Future[Vector[NodeId]] = {
eclairCall[Vector[NodeId]]("findroutetonode",
"nodeId" -> nodeId.toString,
"amountMsat" -> amountMsat.toBigDecimal.toString)
}
override def findRoute(invoice: LnInvoice): Future[Vector[NodeId]] = {
findRoute(invoice, None)
}
override def findRoute(invoice: LnInvoice, amount: MilliSatoshis): Future[Vector[NodeId]] = {
override def findRoute(
invoice: LnInvoice,
amount: MilliSatoshis): Future[Vector[NodeId]] = {
findRoute(invoice, Some(amount))
}
def findRoute(invoice: LnInvoice, amountMsat: Option[MilliSatoshis]): Future[Vector[NodeId]] = {
val params = Seq(Some("invoice" -> invoice.toString), amountMsat.map(x => "amountMsat" -> x.toBigDecimal.toString)).flatten
def findRoute(
invoice: LnInvoice,
amountMsat: Option[MilliSatoshis]): Future[Vector[NodeId]] = {
val params = Seq(
Some("invoice" -> invoice.toString),
amountMsat.map(x => "amountMsat" -> x.toBigDecimal.toString)).flatten
eclairCall[Vector[NodeId]]("findroute", params: _*)
}
@ -141,7 +159,8 @@ class EclairRpcClient(val instance: EclairInstance)(
}
override def forceClose(shortChannelId: ShortChannelId): Future[Unit] = {
eclairCall[String]("forceclose", "shortChannelId" -> shortChannelId.toString).map(_ => ())
eclairCall[String]("forceclose",
"shortChannelId" -> shortChannelId.toString).map(_ => ())
}
override def getInfo: Future[GetInfoResult] = {
@ -191,11 +210,13 @@ class EclairRpcClient(val instance: EclairInstance)(
"pushMsat" -> _pushMsat,
"fundingFeerateSatByte" -> feerateSatPerByte.get.toLong.toString)
} else {
Seq("nodeId" -> nodeId.toString,
Seq(
"nodeId" -> nodeId.toString,
"fundingSatoshis" -> _fundingSatoshis,
"pushMsat" -> _pushMsat,
"fundingFeerateSatByte" -> feerateSatPerByte.get.toLong.toString,
"channelFlags" -> channelFlags.get.toString)
"channelFlags" -> channelFlags.get.toString
)
}
}
@ -275,23 +296,48 @@ class EclairRpcClient(val instance: EclairInstance)(
createInvoice(description, None, None, None, None)
}
override def createInvoice(description: String, amountMsat: MilliSatoshis): Future[LnInvoice] = {
override def createInvoice(
description: String,
amountMsat: MilliSatoshis): Future[LnInvoice] = {
createInvoice(description, Some(amountMsat), None, None, None)
}
override def createInvoice(description: String, amountMsat: MilliSatoshis, expireIn: FiniteDuration): Future[LnInvoice] = {
override def createInvoice(
description: String,
amountMsat: MilliSatoshis,
expireIn: FiniteDuration): Future[LnInvoice] = {
createInvoice(description, Some(amountMsat), Some(expireIn), None, None)
}
override def createInvoice(description: String, amountMsat: MilliSatoshis, paymentPreimage: PaymentPreimage): Future[LnInvoice] = {
createInvoice(description, Some(amountMsat), None, None, Some(paymentPreimage))
override def createInvoice(
description: String,
amountMsat: MilliSatoshis,
paymentPreimage: PaymentPreimage): Future[LnInvoice] = {
createInvoice(description,
Some(amountMsat),
None,
None,
Some(paymentPreimage))
}
override def createInvoice(description: String, amountMsat: MilliSatoshis, expireIn: FiniteDuration, paymentPreimage: PaymentPreimage): Future[LnInvoice] = {
createInvoice(description, Some(amountMsat), Some(expireIn), None, Some(paymentPreimage))
override def createInvoice(
description: String,
amountMsat: MilliSatoshis,
expireIn: FiniteDuration,
paymentPreimage: PaymentPreimage): Future[LnInvoice] = {
createInvoice(description,
Some(amountMsat),
Some(expireIn),
None,
Some(paymentPreimage))
}
override def createInvoice(description: String, amountMsat: Option[MilliSatoshis], expireIn: Option[FiniteDuration], fallbackAddress: Option[Address], paymentPreimage: Option[PaymentPreimage]): Future[LnInvoice] = {
override def createInvoice(
description: String,
amountMsat: Option[MilliSatoshis],
expireIn: Option[FiniteDuration],
fallbackAddress: Option[Address],
paymentPreimage: Option[PaymentPreimage]): Future[LnInvoice] = {
val params = Seq(
Some("description" -> description),
amountMsat.map(x => "amountMsat" -> x.toBigDecimal.toString),
@ -302,9 +348,8 @@ class EclairRpcClient(val instance: EclairInstance)(
val responseF = eclairCall[InvoiceResult]("createinvoice", params: _*)
responseF.flatMap {
res =>
Future.fromTry(LnInvoice.fromString(res.serialized))
responseF.flatMap { res =>
Future.fromTry(LnInvoice.fromString(res.serialized))
}
}
@ -316,11 +361,18 @@ class EclairRpcClient(val instance: EclairInstance)(
payInvoice(invoice, None, None, None, None)
}
override def payInvoice(invoice: LnInvoice, amount: MilliSatoshis): Future[PaymentId] = {
override def payInvoice(
invoice: LnInvoice,
amount: MilliSatoshis): Future[PaymentId] = {
payInvoice(invoice, Some(amount), None, None, None)
}
override def payInvoice(invoice: LnInvoice, amountMsat: Option[MilliSatoshis], maxAttempts: Option[Int], feeThresholdSat: Option[Satoshis], maxFeePct: Option[Int]): Future[PaymentId] = {
override def payInvoice(
invoice: LnInvoice,
amountMsat: Option[MilliSatoshis],
maxAttempts: Option[Int],
feeThresholdSat: Option[Satoshis],
maxFeePct: Option[Int]): Future[PaymentId] = {
val params = Seq(
Some("invoice" -> invoice.toString),
amountMsat.map(x => "amountMsat" -> x.toBigDecimal.toString),
@ -332,27 +384,38 @@ class EclairRpcClient(val instance: EclairInstance)(
eclairCall[PaymentId]("payinvoice", params: _*)
}
override def getReceivedInfo(paymentHash: Sha256Digest): Future[ReceivedPaymentResult] = {
eclairCall[ReceivedPaymentResult]("getreceivedinfo", "paymentHash" -> paymentHash.hex)
override def getReceivedInfo(
paymentHash: Sha256Digest): Future[ReceivedPaymentResult] = {
eclairCall[ReceivedPaymentResult]("getreceivedinfo",
"paymentHash" -> paymentHash.hex)
}
override def getReceivedInfo(invoice: LnInvoice): Future[ReceivedPaymentResult] = {
eclairCall[ReceivedPaymentResult]("getreceivedinfo", "invoice" -> invoice.toString)
override def getReceivedInfo(
invoice: LnInvoice): Future[ReceivedPaymentResult] = {
eclairCall[ReceivedPaymentResult]("getreceivedinfo",
"invoice" -> invoice.toString)
}
override def getSentInfo(paymentHash: Sha256Digest): Future[Vector[PaymentResult]] = {
eclairCall[Vector[PaymentResult]]("getsentinfo", "paymentHash" -> paymentHash.hex)
override def getSentInfo(
paymentHash: Sha256Digest): Future[Vector[PaymentResult]] = {
eclairCall[Vector[PaymentResult]]("getsentinfo",
"paymentHash" -> paymentHash.hex)
}
override def getSentInfo(id: PaymentId): Future[Vector[PaymentResult]] = {
eclairCall[Vector[PaymentResult]]("getsentinfo", "id" -> id.toString)
}
override def sendToNode(nodeId: NodeId, amountMsat: MilliSatoshis, paymentHash: Sha256Digest, maxAttempts: Option[Int], feeThresholdSat: Option[Satoshis], maxFeePct: Option[Int]): Future[PaymentId] = {
val params = Seq(
"nodeId" -> nodeId.toString,
"amountMsat" -> amountMsat.toBigDecimal.toString,
"paymentHash" -> paymentHash.hex) ++ Seq(
override def sendToNode(
nodeId: NodeId,
amountMsat: MilliSatoshis,
paymentHash: Sha256Digest,
maxAttempts: Option[Int],
feeThresholdSat: Option[Satoshis],
maxFeePct: Option[Int]): Future[PaymentId] = {
val params = Seq("nodeId" -> nodeId.toString,
"amountMsat" -> amountMsat.toBigDecimal.toString,
"paymentHash" -> paymentHash.hex) ++ Seq(
maxAttempts.map(x => "maxAttempts" -> x.toString),
feeThresholdSat.map(x => "feeThresholdSat" -> x.toBigDecimal.toString),
maxFeePct.map(x => "maxFeePct" -> x.toString)
@ -361,57 +424,75 @@ class EclairRpcClient(val instance: EclairInstance)(
eclairCall[PaymentId]("sendtonode", params: _*)
}
def sendToRoute(route: TraversableOnce[NodeId], amountMsat: MilliSatoshis, paymentHash: Sha256Digest, finalCltvExpiry: Long): Future[PaymentId] = {
eclairCall[PaymentId]("sendtoroute",
def sendToRoute(
route: TraversableOnce[NodeId],
amountMsat: MilliSatoshis,
paymentHash: Sha256Digest,
finalCltvExpiry: Long): Future[PaymentId] = {
eclairCall[PaymentId](
"sendtoroute",
"route" -> route.mkString(","),
"amountMsat" -> amountMsat.toBigDecimal.toString,
"paymentHash" -> paymentHash.hex,
"finalCltvExpiry" -> finalCltvExpiry.toString)
"finalCltvExpiry" -> finalCltvExpiry.toString
)
}
override def updateRelayFee(
channelId: ChannelId,
feeBaseMsat: MilliSatoshis,
feeProportionalMillionths: Long): Future[Unit] = {
eclairCall[Unit]("updaterelayfee",
eclairCall[Unit](
"updaterelayfee",
"channelId" -> channelId.hex,
"feeBaseMsat" -> feeBaseMsat.toLong.toString,
"feeProportionalMillionths" -> feeProportionalMillionths.toString)
"feeProportionalMillionths" -> feeProportionalMillionths.toString
)
}
override def updateRelayFee(
shortChannelId: ShortChannelId,
feeBaseMsat: MilliSatoshis,
feeProportionalMillionths: Long): Future[Unit] = {
eclairCall[Unit]("updaterelayfee",
eclairCall[Unit](
"updaterelayfee",
"shortChannelId" -> shortChannelId.toHumanReadableString,
"feeBaseMsat" -> feeBaseMsat.toLong.toString,
"feeProportionalMillionths" -> feeProportionalMillionths.toString)
"feeProportionalMillionths" -> feeProportionalMillionths.toString
)
}
override def channelStats(): Future[Vector[ChannelStats]] = {
eclairCall[Vector[ChannelStats]]("channelstats")
}
override def networkFees(from: Option[FiniteDuration], to: Option[FiniteDuration]): Future[Vector[NetworkFeesResult]] = {
eclairCall[Vector[NetworkFeesResult]]("networkfees", Seq(
from.map(x => "from" -> x.toSeconds.toString),
to.map(x => "to" -> x.toSeconds.toString)).flatten: _*)
override def networkFees(
from: Option[FiniteDuration],
to: Option[FiniteDuration]): Future[Vector[NetworkFeesResult]] = {
eclairCall[Vector[NetworkFeesResult]](
"networkfees",
Seq(from.map(x => "from" -> x.toSeconds.toString),
to.map(x => "to" -> x.toSeconds.toString)).flatten: _*)
}
override def getInvoice(paymentHash: Sha256Digest): Future[LnInvoice] = {
val resF = eclairCall[InvoiceResult]("getinvoice", "paymentHash" -> paymentHash.hex)
resF.flatMap {
res =>
Future.fromTry(LnInvoice.fromString(res.serialized))
val resF =
eclairCall[InvoiceResult]("getinvoice", "paymentHash" -> paymentHash.hex)
resF.flatMap { res =>
Future.fromTry(LnInvoice.fromString(res.serialized))
}
}
override def listInvoices(from: Option[FiniteDuration], to: Option[FiniteDuration]): Future[Vector[LnInvoice]] = {
val resF = eclairCall[Vector[InvoiceResult]]("listinvoices", Seq(
from.map(x => "from" -> x.toSeconds.toString),
to.map(x => "to" -> x.toSeconds.toString)).flatten: _*)
resF.flatMap(xs => Future.sequence(xs.map(x => Future.fromTry(LnInvoice.fromString(x.serialized)))))
override def listInvoices(
from: Option[FiniteDuration],
to: Option[FiniteDuration]): Future[Vector[LnInvoice]] = {
val resF = eclairCall[Vector[InvoiceResult]](
"listinvoices",
Seq(from.map(x => "from" -> x.toSeconds.toString),
to.map(x => "to" -> x.toSeconds.toString)).flatten: _*)
resF.flatMap(xs =>
Future.sequence(xs.map(x =>
Future.fromTry(LnInvoice.fromString(x.serialized)))))
}
override def usableBalances(): Future[Vector[UsableBalancesResult]] = {
@ -432,9 +513,9 @@ class EclairRpcClient(val instance: EclairInstance)(
val payloadF: Future[JsValue] = responseF.flatMap(getPayload)
payloadF.map { payload =>
val validated: JsResult[T] = payload.validate[T]
val parsed: T = parseResult(validated, payload, command)
parsed
val validated: JsResult[T] = payload.validate[T]
val parsed: T = parseResult(validated, payload, command)
parsed
}
}
@ -568,4 +649,70 @@ class EclairRpcClient(val instance: EclairInstance)(
def stop(): Option[Unit] = {
process.map(_.destroy())
}
/**
* Pings eclair to see if a invoice has been paid
* If the invoice has been paid or the payment has failed, we publish a
* [[org.bitcoins.eclair.rpc.json.PaymentResult PaymentResult]]
* event to the [[akka.actor.ActorSystem ActorSystem]]'s
* [[akka.event.EventStream ActorSystem.eventStream]]
*
* We also return a Future[PaymentResult] that is completed when one of three things is true
* 1. The payment has succeeded
* 2. The payment has failed
* 3. We have attempted to query the eclair more than maxAttempts, and the payment is still pending
*/
override def monitorSentPayment(
paymentId: PaymentId,
interval: FiniteDuration,
maxAttempts: Int): Future[PaymentResult] = {
val p: Promise[PaymentResult] = Promise[PaymentResult]()
val runnable = new Runnable() {
private val attempts = new AtomicInteger(0)
override def run(): Unit = {
if (attempts.incrementAndGet() > maxAttempts) {
// too many tries to get info about a payment
// either Eclair is down or the payment is still in PENDING state for some reason
// complete the promise with an exception so the runnable will be canceled
p.failure(
new RuntimeException(
s"EclairApi.monitorSentPayment() too many attempts: ${attempts
.get()} for paymentId=${paymentId} for interval=${interval}"))
} else {
val resultsF = getSentInfo(paymentId)
resultsF.recover {
case e: Throwable => logger.error(s"Cannot check payment status for paymentId=${paymentId}", e)
}
val _ = for {
results <- resultsF
} yield {
results.foreach { result =>
result.status match {
case PaymentStatus.PENDING =>
//do nothing, while we wait for eclair to attempt to process
case PaymentStatus.SUCCEEDED | PaymentStatus.FAILED =>
// invoice has been succeeded or has failed, let's publish to event stream
// so subscribers to the event stream can see that a payment
// was received or failed
// complete the promise so the runnable will be canceled
p.success(result)
}
}
}
}
}
}
val cancellable = system.scheduler.schedule(interval, interval, runnable)
val f = p.future
f.onComplete(_ => cancellable.cancel())
f.foreach(system.eventStream.publish)
f
}
}