Make start methods return Future[Unit] (#328)

WIP: Refactoring EclairTestUtil to use Futures

Finished refactor EclairTestUtil to use futures

Address code review

fix async bug in chan opening

Change snapshot version to something that should compile
This commit is contained in:
Chris Stewart 2019-02-07 19:58:18 -06:00 committed by GitHub
parent 248c500512
commit 3200e6a1e3
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
11 changed files with 726 additions and 431 deletions

View file

@ -40,6 +40,8 @@ lazy val commonSettings = List(
scalacOptions in Test := testCompilerOpts,
testOptions in Test += Tests.Argument("-oF"),
assemblyOption in assembly := (assemblyOption in assembly).value
.copy(includeScala = false),

View file

@ -11,10 +11,10 @@ import akka.stream.ActorMaterializer
import akka.util.ByteString
import org.bitcoins.core.crypto.Sha256Digest
import org.bitcoins.core.currency.CurrencyUnit
import org.bitcoins.core.protocol.ln.{LnInvoice, LnParams, ShortChannelId}
import org.bitcoins.core.protocol.ln.channel.{ChannelId, FundedChannelId}
import org.bitcoins.core.protocol.ln.currency.{LnCurrencyUnit, MilliSatoshis}
import org.bitcoins.core.protocol.ln.node.NodeId
import org.bitcoins.core.protocol.ln.{LnInvoice, LnParams, ShortChannelId}
import org.bitcoins.core.protocol.script.ScriptPubKey
import org.bitcoins.core.util.BitcoinSUtil
import org.bitcoins.core.wallet.fee.SatoshisPerByte
@ -23,13 +23,14 @@ import org.bitcoins.eclair.rpc.config.EclairInstance
import org.bitcoins.eclair.rpc.json._
import org.bitcoins.eclair.rpc.network.{NodeUri, PeerState}
import org.bitcoins.rpc.serializers.JsonReaders._
import org.bitcoins.rpc.util.AsyncUtil
import org.slf4j.LoggerFactory
import play.api.libs.json._
import scala.concurrent.duration.DurationInt
import scala.concurrent.{Await, ExecutionContext, Future, Promise}
import scala.concurrent.{ExecutionContext, Future, Promise}
import scala.sys.process._
import scala.util.{Failure, Properties, Success, Try}
import scala.util.{Failure, Properties, Success}
class EclairRpcClient(val instance: EclairInstance)(
implicit system: ActorSystem)
@ -559,7 +560,17 @@ class EclairRpcClient(val instance: EclairInstance)(
private var process: Option[Process] = None
def start(): Unit = {
/** Starts eclair on the local system.
*
* @return a future that completes when eclair is fully started.
* If eclair has not successfully started in 60 seconds
* the future times out.
*/
def start(): Future[Unit] = {
val _ = {
require(instance.authCredentials.datadir.isDefined, s"A datadir needs to be provided to start eclair")
if (process.isEmpty) {
val p = Process(
@ -574,12 +585,26 @@ class EclairRpcClient(val instance: EclairInstance)(
logger.info(s"Eclair was already started!")
()
}
}
def isStarted(): Boolean = {
val t = Try(Await.result(getInfo, 1.second))
t.isSuccess
val started = AsyncUtil.retryUntilSatisfiedF(
() => isStarted,
duration = 1.seconds,
maxTries = 60)
started
}
def isStarted(): Future[Boolean] = {
val p = Promise[Boolean]()
getInfo.onComplete {
case Success(_) => p.success(true)
case Failure(_) => p.success(false)
}
p.future
}
def stop(): Option[Unit] = {

View file

@ -5,9 +5,11 @@ sealed abstract class PeerState
object PeerState {
case object CONNECTED extends PeerState
case object INITIALIZING extends PeerState
case object DISCONNECTED extends PeerState
private val all = List(CONNECTED, DISCONNECTED)
private val all = List(CONNECTED, INITIALIZING, DISCONNECTED)
def fromString(str: String): Option[PeerState] = {
all.find(_.toString == str)

View file

@ -7,7 +7,7 @@ import org.bitcoins.core.config.RegTest
import org.bitcoins.core.currency.{CurrencyUnit, CurrencyUnits, Satoshis}
import org.bitcoins.core.number.Int64
import org.bitcoins.core.protocol.ln.LnParams.LnBitcoinRegTest
import org.bitcoins.core.protocol.ln.channel.{ChannelId, ChannelState}
import org.bitcoins.core.protocol.ln.channel.{ChannelId, ChannelState, FundedChannelId}
import org.bitcoins.core.protocol.ln.currency._
import org.bitcoins.core.protocol.ln.node.NodeId
import org.bitcoins.core.util.BitcoinSLogger
@ -32,42 +32,90 @@ class EclairRpcClientTest extends AsyncFlatSpec with BeforeAndAfterAll {
val logger: Logger = BitcoinSLogger.logger
val bitcoindRpcClient: BitcoindRpcClient =
BitcoindRpcTestUtil.startedBitcoindRpcClient()
lazy val EclairNodes4(firstClient, secondClient, thirdClient, fourthClient) = {
val EclairNodes4(first, second, third, fourth) =
EclairRpcTestUtil.createNodeLink(bitcoindRpcClient)
clients ++= List(first, second, third, fourth)
EclairNodes4(first, second, third, fourth)
val bitcoindRpcClientF: Future[BitcoindRpcClient] = {
val cliF = BitcoindRpcTestUtil.startedBitcoindRpcClient()
// make sure we have enough money open channels
//not async safe
val blocksF = cliF.flatMap(_.generate(200))
blocksF.flatMap(_ => cliF)
}
lazy val (client, otherClient) = {
val (c1, c2) = EclairRpcTestUtil.createNodePair(Some(bitcoindRpcClient))
clients += c1
clients += c2
(c1, c2)
val eclairNodesF: Future[EclairNodes4] = {
bitcoindRpcClientF.flatMap { bitcoindRpcClient =>
val nodesF = EclairRpcTestUtil.createNodeLink(bitcoindRpcClient)
val addedF = nodesF.map { nodes =>
clients ++= List(nodes.c1, nodes.c2, nodes.c3, nodes.c4)
}
addedF.flatMap(_ => nodesF)
}
}
lazy val firstClientF = eclairNodesF.map(_.c1)
lazy val secondClientF = eclairNodesF.map(_.c2)
lazy val thirdClientF = eclairNodesF.map(_.c3)
lazy val fourthClientF = eclairNodesF.map(_.c4)
/** There is specific cases where we just need two clients,
* so this is a helper val that pairs two connected
* clients together with an open channel
*/
lazy val clientOtherClientF = {
//use second and third client above since they
//aren't really being used in the tests that use eclairNodesF
secondClientF.flatMap(s => thirdClientF.map(t => (s,t)))
}
lazy val clientF = clientOtherClientF.map(_._1)
lazy val otherClientF = clientOtherClientF.map(_._2)
private val clients =
Vector.newBuilder[EclairRpcClient]
override def beforeAll(): Unit = {
// make sure we have enough money open channels
bitcoindRpcClient.generate(200)
/** Executes a test with the default clients defined at the
* top of this file
* @param test
* @tparam T
* @return
*/
def executeWithClientOtherClient[T](test: (EclairRpcClient,EclairRpcClient) => Future[T]): Future[T] = {
executeSpecificClients(clientF = clientF,
otherClientF = otherClientF,
test = test)
}
/** Executes the test with the clients passed as a parameter */
def executeSpecificClients[T](clientF: Future[EclairRpcClient],
otherClientF: Future[EclairRpcClient],
test: (EclairRpcClient,EclairRpcClient) => Future[T]): Future[T] = {
clientF.flatMap { c1 =>
otherClientF.flatMap { other =>
test(c1,other)
}
}
}
behavior of "RpcClient"
it should "be able to open and close a channel" in {
val changeAddrF = bitcoindRpcClient.getNewAddress()
val changeAddrF = bitcoindRpcClientF.flatMap(_.getNewAddress())
val result: Future[Assertion] = {
val isOpenedF: Future[(ChannelId, Assertion)] = {
val getChannelId = (client: EclairRpcClient, otherClient: EclairRpcClient) => {
otherClient.getInfo.flatMap { info =>
val amt = Satoshis(Int64(100000))
val openedChanF = client.open(info.nodeId, amt)
val openedChanF = clientF.flatMap(_.open(info.nodeId, amt))
openedChanF.flatMap { channelId =>
val exists = hasChannel(client, channelId)
@ -75,29 +123,43 @@ class EclairRpcClientTest extends AsyncFlatSpec with BeforeAndAfterAll {
}
}
}
executeWithClientOtherClient[(ChannelId,Assertion)](getChannelId)
}
val isConfirmedF: Future[(ChannelId, Assertion)] = {
isOpenedF.map {
val getIsConfirmed = {
(client: EclairRpcClient, _: EclairRpcClient) =>
isOpenedF.flatMap {
case (chanId, assertion) =>
val _ = bitcoindRpcClient.generate(6)
val generatedF = bitcoindRpcClientF.flatMap(_.generate(6))
val normalF = generatedF.flatMap { _ =>
EclairRpcTestUtil.awaitUntilChannelNormal(
client = client,
chanId = chanId
)
(chanId, assertion)
}
normalF.map(_ => (chanId, assertion))
}
}
executeWithClientOtherClient(getIsConfirmed)
}
val isClosedF = {
isConfirmedF.flatMap {
val getIsClosed = {
(client: EclairRpcClient, _: EclairRpcClient) => isConfirmedF.flatMap {
case (chanId, assertion) =>
val closedF = changeAddrF.flatMap { addr =>
client.close(chanId, addr.scriptPubKey)
}
val closedF = client.close(chanId, addr.scriptPubKey)
closedF.flatMap { _ =>
EclairRpcTestUtil.awaitUntilChannelClosing(client, chanId)
}
}
closedF.flatMap { _ =>
val chanF = client.channel(chanId)
chanF.map { chan =>
assert(chan.state == ChannelState.CLOSING)
@ -106,15 +168,18 @@ class EclairRpcClientTest extends AsyncFlatSpec with BeforeAndAfterAll {
}
}
executeWithClientOtherClient(getIsClosed)
}
val closedOnChainF = {
isClosedF.flatMap { _ =>
changeAddrF.flatMap { addr =>
val amountF =
bitcoindRpcClientF.flatMap { bitcoindRpcClient =>
bitcoindRpcClient.getReceivedByAddress(address = addr,
minConfirmations = 0)
}
amountF.map(amt => assert(amt > CurrencyUnits.zero))
}
}
}
@ -126,31 +191,65 @@ class EclairRpcClientTest extends AsyncFlatSpec with BeforeAndAfterAll {
}
it should "fail to authenticate on bad password" in {
val goodCredentials = client.instance.authCredentials
val badCredentials = EclairAuthCredentials("bad_password",
goodCredentials.bitcoinAuthOpt,
goodCredentials.port)
val badInstance = EclairInstance(client.instance.network,
client.instance.uri,
client.instance.rpcUri,
badCredentials)
val badClient = new EclairRpcClient(badInstance)
val goodCredentialsF = {
val getAuthCredentials = {
(client: EclairRpcClient, _: EclairRpcClient) =>
Future.successful(client.instance.authCredentials)
}
executeWithClientOtherClient[EclairAuthCredentials](getAuthCredentials)
}
val badCredentialsF = goodCredentialsF.map { good =>
EclairAuthCredentials("bad_password",
good.bitcoinAuthOpt,
good.port)
}
val badInstanceF = badCredentialsF.flatMap { badCredentials =>
val getBadInstance = (client: EclairRpcClient, _: EclairRpcClient) => {
val instance = EclairInstance(network = client.instance.network,
uri = client.instance.uri,
rpcUri = client.instance.rpcUri,
authCredentials = badCredentials)
Future.successful(instance)
}
executeWithClientOtherClient(getBadInstance)
}
val badClientF = badInstanceF.map(new EclairRpcClient(_))
badClientF.flatMap { badClient =>
recoverToSucceededIf[RuntimeException](badClient.getInfo)
}
}
it should "be able to list an existing peer and isConnected must be true" in {
//test assumes that a connection to a peer was made in `beforeAll`
val otherClientNodeIdF = otherClient.getInfo.map(_.nodeId)
val otherClientNodeIdF = {
val getOtherClientNodeId = {
(_:EclairRpcClient, otherClient: EclairRpcClient) =>
otherClient.getInfo.map(_.nodeId)
}
otherClientNodeIdF.flatMap(nid => hasConnection(client, nid))
executeWithClientOtherClient(getOtherClientNodeId)
}
otherClientNodeIdF.flatMap(nid => hasConnection(clientF, nid))
}
it should "ble able to pay to a hash" in {
val amt = MilliSatoshis(50)
val getPayment = {
(client: EclairRpcClient,otherClient: EclairRpcClient) => {
for {
channelId <- openAndConfirmChannel(client, otherClient)
channelId <- openAndConfirmChannel(clientF, otherClientF)
otherClientNodeId <- otherClient.getInfo.map(_.nodeId)
channels <- client.channels(otherClientNodeId)
// without this we've been getting "route not found"
@ -161,54 +260,70 @@ class EclairRpcClientTest extends AsyncFlatSpec with BeforeAndAfterAll {
invoice.lnTags.paymentHash.hash,
otherClientNodeId)
_ <- client.close(channelId)
_ <- bitcoindRpcClient.generate(6)
_ <- bitcoindRpcClientF.flatMap(_.generate(6))
} yield {
assert(payment.isInstanceOf[PaymentSucceeded])
val succeeded = payment.asInstanceOf[PaymentSucceeded]
assert(succeeded.amountMsat == amt)
}
}
}
executeWithClientOtherClient(getPayment)
}
it should "be able to generate an invoice with amount and pay it" in {
val amt = MilliSatoshis(50)
val getPaymentWithAmount = {
(client: EclairRpcClient, otherClient: EclairRpcClient) => {
for {
channelId <- openAndConfirmChannel(client, otherClient)
channelId <- openAndConfirmChannel(clientF, otherClientF)
invoice <- otherClient.receive(amt.toLnCurrencyUnit)
payment <- client.send(invoice)
_ <- client.close(channelId)
_ <- bitcoindRpcClient.generate(6)
_ <- bitcoindRpcClientF.flatMap(_.generate(6))
} yield {
assert(payment.isInstanceOf[PaymentSucceeded])
val succeeded = payment.asInstanceOf[PaymentSucceeded]
assert(succeeded.amountMsat == amt)
}
}
}
executeWithClientOtherClient(getPaymentWithAmount)
}
it should "be able to generate an invoice without amount and pay it" in {
val amt = MilliSatoshis(50)
val getPaymentNoAmount = {
(client: EclairRpcClient, otherClient: EclairRpcClient) => {
for {
channelId <- openAndConfirmChannel(client, otherClient)
channelId <- openAndConfirmChannel(clientF, otherClientF)
invoice <- otherClient.receive("no amount")
payment <- client.send(invoice, amt.toLnCurrencyUnit)
_ <- client.close(channelId)
_ <- bitcoindRpcClient.generate(6)
_ <- bitcoindRpcClientF.flatMap(_.generate(6))
} yield {
assert(payment.isInstanceOf[PaymentSucceeded])
val succeeded = payment.asInstanceOf[PaymentSucceeded]
assert(succeeded.amountMsat == amt)
}
}
}
executeWithClientOtherClient(getPaymentNoAmount)
}
it should "be able to generate an invoice and get the same amount back" in {
val amt = PicoBitcoins(10) //this is the smallest unit we can use, 1 msat
val description = "bitcoin-s test case"
val expiry = (System.currentTimeMillis() / 1000)
val invoiceF = client.receive(description = description,
val invoiceF = clientF.flatMap(_.receive(description = description,
amountMsat = amt,
expirySeconds = expiry)
expirySeconds = expiry))
val assert0: Future[Assertion] = {
invoiceF.map { i =>
@ -219,9 +334,9 @@ class EclairRpcClientTest extends AsyncFlatSpec with BeforeAndAfterAll {
}
val amt1 = NanoBitcoins.one
val invoice1F = client.receive(description = description,
val invoice1F = clientF.flatMap(_.receive(description = description,
amountMsat = amt1,
expirySeconds = expiry)
expirySeconds = expiry))
val assert1 = {
invoice1F.map { i =>
@ -232,9 +347,9 @@ class EclairRpcClientTest extends AsyncFlatSpec with BeforeAndAfterAll {
}
val amt2 = MicroBitcoins.one
val invoice2F = client.receive(description = description,
val invoice2F = clientF.flatMap(_.receive(description = description,
amountMsat = amt2,
expirySeconds = expiry)
expirySeconds = expiry))
val assert2 = {
invoice2F.map { i =>
@ -247,9 +362,9 @@ class EclairRpcClientTest extends AsyncFlatSpec with BeforeAndAfterAll {
val amt3 = MilliBitcoins.one
val invoice3F = client.receive(description = description,
val invoice3F = clientF.flatMap(_.receive(description = description,
amountMsat = amt3,
expirySeconds = expiry)
expirySeconds = expiry))
val assert3 = {
invoice3F.map { i =>
@ -270,12 +385,12 @@ class EclairRpcClientTest extends AsyncFlatSpec with BeforeAndAfterAll {
val description = "bitcoin-s test case"
val expiry = (System.currentTimeMillis() / 1000)
val invoiceF = client.receive(description = description,
val invoiceF = clientF.flatMap(_.receive(description = description,
amountMsat = amt,
expirySeconds = expiry)
expirySeconds = expiry))
val paymentRequestF: Future[PaymentRequest] = invoiceF.flatMap { i =>
client.checkInvoice(i)
clientF.flatMap(_.checkInvoice(i))
}
paymentRequestF.map { paymentRequest =>
@ -285,13 +400,13 @@ class EclairRpcClientTest extends AsyncFlatSpec with BeforeAndAfterAll {
}
it should "open a channel, send a payment, and close the channel" in {
val openChannelIdF = openAndConfirmChannel(client, otherClient)
val openChannelIdF = openAndConfirmChannel(clientF, otherClientF)
val paymentAmount = NanoBitcoins(100000)
val invoiceF =
openChannelIdF.flatMap(_ => otherClient.receive(paymentAmount))
openChannelIdF.flatMap(_ => otherClientF.flatMap(_.receive(paymentAmount)))
val paymentF = invoiceF.flatMap(i => client.send(i))
val paymentF = invoiceF.flatMap(i => clientF.flatMap(_.send(i)))
val isCorrectAmountF = paymentF.map { p =>
assert(p.isInstanceOf[PaymentSucceeded])
@ -304,11 +419,17 @@ class EclairRpcClientTest extends AsyncFlatSpec with BeforeAndAfterAll {
val closedChannelF: Future[Assertion] = isCorrectAmountF.flatMap { _ =>
openChannelIdF.flatMap { cid =>
val closedF = client.close(cid)
val closedF = clientF.flatMap(_.close(cid))
val otherClientClosedF = otherClientF.flatMap { otherClient =>
closedF.flatMap { _ =>
EclairRpcTestUtil.awaitUntilChannelClosing(otherClient, cid)
}
}
val otherStateF = closedF.flatMap(_ => otherClient.channel(cid))
val otherStateF = otherClientClosedF.flatMap { _ =>
otherClientF.flatMap(_.channel(cid))
}
val isClosed = otherStateF.map { channel =>
assert(channel.state == ChannelState.CLOSING)
@ -326,23 +447,23 @@ class EclairRpcClientTest extends AsyncFlatSpec with BeforeAndAfterAll {
}
it should "check a payment" in {
val openChannelIdF = openAndConfirmChannel(client, otherClient)
val openChannelIdF = openAndConfirmChannel(clientF, otherClientF)
val paymentAmount = NanoBitcoins(100000)
val invoiceF =
openChannelIdF.flatMap(_ => otherClient.receive(paymentAmount))
openChannelIdF.flatMap(_ => otherClientF.flatMap(_.receive(paymentAmount)))
val isPaid1F = invoiceF.flatMap(i => otherClient.checkPayment(Left(i)))
val isPaid1F = invoiceF.flatMap(i => otherClientF.flatMap(_.checkPayment(Left(i))))
val isNotPaidAssertF = isPaid1F.map(isPaid => assert(!isPaid))
//send the payment now
val paidF: Future[PaymentResult] = invoiceF.flatMap(i => client.send(i))
val paidF: Future[PaymentResult] = invoiceF.flatMap(i => clientF.flatMap(_.send(i)))
val isPaid2F: Future[Boolean] = paidF.flatMap { p =>
val succeed = p.asInstanceOf[PaymentSucceeded]
otherClient.checkPayment(Right(succeed.paymentHash))
otherClientF.flatMap(_.checkPayment(Right(succeed.paymentHash)))
}
val isPaidAssertF = isPaid2F.map(isPaid => assert(isPaid))
@ -356,31 +477,31 @@ class EclairRpcClientTest extends AsyncFlatSpec with BeforeAndAfterAll {
}
it should "be able to send payments in both directions" in {
val openChannelIdF = openAndConfirmChannel(client, otherClient)
val openChannelIdF = openAndConfirmChannel(clientF, otherClientF)
val paymentAmount = NanoBitcoins(100000)
val invoiceF =
openChannelIdF.flatMap(_ => otherClient.receive(paymentAmount))
openChannelIdF.flatMap(_ => otherClientF.flatMap(_.receive(paymentAmount)))
//send the payment now
val paidF: Future[PaymentResult] = invoiceF.flatMap(i => client.send(i))
val paidF: Future[PaymentResult] = invoiceF.flatMap(i => clientF.flatMap(_.send(i)))
val isPaidF: Future[Boolean] = paidF.flatMap { p =>
val succeed = p.asInstanceOf[PaymentSucceeded]
otherClient.checkPayment(Right(succeed.paymentHash))
otherClientF.flatMap(_.checkPayment(Right(succeed.paymentHash)))
}
val isPaidAssertF = isPaidF.map(isPaid => assert(isPaid))
isPaidAssertF.flatMap { isPaid =>
val invoice2F = openChannelIdF.flatMap(_ => client.receive(paymentAmount))
val invoice2F = openChannelIdF.flatMap(_ => clientF.flatMap(_.receive(paymentAmount)))
//send the payment now
val paid2F: Future[PaymentResult] =
invoice2F.flatMap((i => otherClient.send(i)))
invoice2F.flatMap((i => otherClientF.flatMap(_.send(i))))
val isPaid2F: Future[Boolean] = paid2F.flatMap { p =>
assert(p.isInstanceOf[PaymentSucceeded])
val succeed = p.asInstanceOf[PaymentSucceeded]
client.checkPayment(Right(succeed.paymentHash))
clientF.flatMap(_.checkPayment(Right(succeed.paymentHash)))
}
isPaid2F.map(isPaid => assert(isPaid))
@ -389,8 +510,8 @@ class EclairRpcClientTest extends AsyncFlatSpec with BeforeAndAfterAll {
it should "update the relay fee of a channel" in {
val channelAndFeeF = for {
channel <- openAndConfirmChannel(client, otherClient)
feeOpt <- client.channel(channel).map(_.feeBaseMsat)
channel <- openAndConfirmChannel(clientF, otherClientF)
feeOpt <- clientF.flatMap(_.channel(channel).map(_.feeBaseMsat))
} yield {
assert(feeOpt.isDefined)
assert(feeOpt.get > MilliSatoshis.zero)
@ -399,8 +520,8 @@ class EclairRpcClientTest extends AsyncFlatSpec with BeforeAndAfterAll {
for {
(channel, oldFee) <- channelAndFeeF
_ <- client.updateRelayFee(channel, MilliSatoshis(oldFee.toLong * 2), 1)
newFeeOpt <- client.channel(channel).map(_.feeBaseMsat)
_ <- clientF.flatMap(_.updateRelayFee(channel, MilliSatoshis(oldFee.toLong * 2), 1))
newFeeOpt <- clientF.flatMap(_.channel(channel).map(_.feeBaseMsat))
} yield {
assert(newFeeOpt.isDefined)
assert(newFeeOpt.get == MilliSatoshis(oldFee.toLong * 2))
@ -408,19 +529,19 @@ class EclairRpcClientTest extends AsyncFlatSpec with BeforeAndAfterAll {
}
it should "get all channels" in {
client.allChannels().flatMap(_ => succeed)
clientF.flatMap(_.allChannels().flatMap(_ => succeed))
}
it should "get all channel updates" in {
client.allUpdates().flatMap { _ =>
clientF.flatMap(_.allUpdates().flatMap { _ =>
succeed
}
})
}
it should "get a route to a node ID" in {
val hasRoute = () => {
fourthClient.getInfo
.flatMap(info => firstClient.findRoute(info.nodeId))
fourthClientF.flatMap(_.getInfo)
.flatMap(info => firstClientF.flatMap(_.findRoute(info.nodeId)))
.map(route => route.length == 4)
.recover {
case err: RuntimeException
@ -437,9 +558,11 @@ class EclairRpcClientTest extends AsyncFlatSpec with BeforeAndAfterAll {
it should "get a route to an invoice" in {
val hasRoute = () => {
fourthClient
.receive("foo")
.flatMap(invoice => firstClient.findRoute(invoice))
fourthClientF.flatMap { fourthClient =>
fourthClient.receive("foo")
.flatMap { invoice =>
firstClientF.flatMap(_.findRoute(invoice))
}
.map(route => route.length == 4)
.recover {
case err: RuntimeException
@ -448,6 +571,8 @@ class EclairRpcClientTest extends AsyncFlatSpec with BeforeAndAfterAll {
}
}
}
// Eclair is a bit slow in propagating channel changes
AsyncUtil.awaitConditionF(hasRoute, duration = 1000.millis, maxTries = 10)
@ -456,19 +581,14 @@ class EclairRpcClientTest extends AsyncFlatSpec with BeforeAndAfterAll {
it should "send some payments and get the audit info" in {
for {
invoice <- fourthClient.receive(MilliSatoshis(50000).toLnCurrencyUnit)
_ <- firstClient
.send(invoice)
.map(payment => assert(payment.isInstanceOf[PaymentSucceeded]))
received <- fourthClient
.audit()
invoice <- fourthClientF.flatMap(_.receive(MilliSatoshis(50000).toLnCurrencyUnit))
_ <- firstClientF.flatMap(_.send(invoice)
.map(payment => assert(payment.isInstanceOf[PaymentSucceeded])))
received <- fourthClientF.flatMap(_.audit())
.map(_.received) // check for received payments
relayed <- secondClient
.audit()
.map(_.relayed) // check for relayed payments
sent <- firstClient
.audit()
.map(_.sent) // check for sent payments
relayed <- secondClientF.flatMap(_.audit()).map(_.relayed) // check for relayed payments
sent <- firstClientF.flatMap(_.audit()).map(_.sent) // check for sent payments
} yield {
assert(received.nonEmpty)
assert(relayed.nonEmpty)
@ -480,44 +600,63 @@ class EclairRpcClientTest extends AsyncFlatSpec with BeforeAndAfterAll {
// needs nodes with activity both related and not related
// to them
it should "get all channel updates for a given node ID" in {
val (firstFreshClient, secondFreshClient) =
val freshClients1F = bitcoindRpcClientF.flatMap { bitcoindRpcClient =>
EclairRpcTestUtil.createNodePair(Some(bitcoindRpcClient))
val (thirdFreshClient, fourthFreshClient) =
}
val freshClients2F = bitcoindRpcClientF.flatMap { bitcoindRpcClient =>
EclairRpcTestUtil.createNodePair(Some(bitcoindRpcClient))
}
clients ++= List(firstFreshClient,
secondFreshClient,
thirdFreshClient,
fourthFreshClient)
EclairRpcTestUtil.connectLNNodes(firstFreshClient, thirdFreshClient)
EclairRpcTestUtil.connectLNNodes(firstFreshClient, fourthFreshClient)
val connectedClientsF: Future[EclairNodes4] = {
freshClients1F.flatMap { case (freshClient1, freshClient2) =>
freshClients2F.flatMap { case (freshClient3,freshClient4) =>
def block[T](fut: Future[T]): T = Await.result(fut, 60.seconds)
clients ++= List(freshClient1,
freshClient2,
freshClient3,
freshClient4)
def openChannel(c1: EclairRpcClient, c2: EclairRpcClient) = {
block(
val connect1And3 = EclairRpcTestUtil.connectLNNodes(freshClient1, freshClient3)
val connect1And4 = EclairRpcTestUtil.connectLNNodes(freshClient1, freshClient4)
connect1And3.flatMap { _ =>
connect1And4.map { _ =>
EclairNodes4(freshClient1, freshClient2, freshClient3, freshClient4)
}
}
}
}
}
def openChannel(c1: EclairRpcClient, c2: EclairRpcClient): Future[FundedChannelId] = {
EclairRpcTestUtil
.openChannel(c1, c2, Satoshis(Int64(500000)), MilliSatoshis(500000)))
.openChannel(c1, c2, Satoshis(Int64(500000)), MilliSatoshis(500000))
}
openChannel(firstFreshClient, secondFreshClient)
openChannel(thirdFreshClient, fourthFreshClient)
block(bitcoindRpcClient.generate(10))
block(EclairRpcTestUtil.sendPayments(firstFreshClient, secondFreshClient))
block(EclairRpcTestUtil.sendPayments(thirdFreshClient, fourthFreshClient))
def updateIsInChannels(channels: Seq[OpenChannelInfo])(
update: ChannelUpdate): Boolean = {
channels.exists(_.shortChannelId == update.shortChannelId)
val openedChannelsF: Future[(ChannelId, ChannelId)] = {
connectedClientsF.flatMap { case nodes4: EclairNodes4 =>
val chan1F = openChannel(nodes4.c1, nodes4.c2)
val chan2F = openChannel(nodes4.c3, nodes4.c4)
chan1F.flatMap(chanId1 => chan2F.map(chanId2 => (chanId1,chanId2)))
}
}
def updateIsNotInChannels(channels: Seq[OpenChannelInfo])(
update: ChannelUpdate): Boolean =
!updateIsInChannels(channels)(update)
val gen10F = openedChannelsF.flatMap(_ => bitcoindRpcClientF.flatMap(_.generate(10)))
val nodesReadyForPayments = gen10F.flatMap(_ => connectedClientsF)
val sendPaymentsF = nodesReadyForPayments.flatMap { n4 =>
val p1F = EclairRpcTestUtil.sendPayments(n4.c1, n4.c2)
val p2F = EclairRpcTestUtil.sendPayments(n4.c3, n4.c4)
p1F.flatMap(_ => p2F)
}
val getChannelUpdates = {
(firstFreshClient: EclairRpcClient, secondFreshClient: EclairRpcClient) =>
for {
nodeId <- secondFreshClient.getInfo.map(_.nodeId)
ourOpenChannels <- firstFreshClient
@ -527,28 +666,88 @@ class EclairRpcClientTest extends AsyncFlatSpec with BeforeAndAfterAll {
})
ourChannelUpdates <- firstFreshClient.allUpdates(nodeId)
allChannelUpdates <- firstFreshClient.allUpdates()
} yield {
assert(ourChannelUpdates.forall(updateIsInChannels(ourOpenChannels)))
assert(allChannelUpdates.exists(updateIsNotInChannels(ourOpenChannels)))
succeed
}
}
val client1F = connectedClientsF.map(_.c1)
val client2F = connectedClientsF.map(_.c2)
sendPaymentsF.flatMap { _ =>
executeSpecificClients(clientF = client1F,
otherClientF = client2F,
test = getChannelUpdates)
}
}
it must "receive gossip messages about channel updates for nodes we do not have a direct channel with" in {
//make sure we see payments outside of our immediate peers
//this is important because these gossip messages contain
//information about channel fees, so we need to get updates
//about the channels for future fee calculations
val sentPaymentF = secondClientF.flatMap { c1 =>
thirdClientF.flatMap { c2 =>
EclairRpcTestUtil.sendPayments(c1,c2)
}
}
val gossipFromPeerWithNoChannel = {
(client: EclairRpcClient, nonPeer: EclairRpcClient) =>
sentPaymentF.flatMap { _ =>
val nodeIdF = client.getNodeURI.map(_.nodeId)
def ourUpdates = nodeIdF.flatMap(nonPeer.allUpdates(_))
def allUpdates = nonPeer.allUpdates()
def checkUpdates() : Future[Boolean] = {
ourUpdates.flatMap( our =>
allUpdates.map { all =>
our != all
}
)
}
val checkedUpatesF: Future[Unit] = AsyncUtil.retryUntilSatisfiedF(
checkUpdates,
duration = 5.seconds,
maxTries = 15)
val hasUpdateP = Promise[Assertion]()
checkedUpatesF.onComplete { t =>
hasUpdateP.success(assert(t.isSuccess))
}
hasUpdateP.future
}
}
//the second client and fourth client aren't directly connected
//which is why i am choosing to use them for this test
executeSpecificClients(
clientF = secondClientF,
otherClientF = fourthClientF,
test = gossipFromPeerWithNoChannel
)
}
it should "detect what network we are on" in {
assert(client.network == LnBitcoinRegTest)
clientF.map(c => assert(c.network == LnBitcoinRegTest))
}
private def hasConnection(
client: EclairRpcClient,
client: Future[EclairRpcClient],
nodeId: NodeId): Future[Assertion] = {
val hasPeersF = client.getPeers.map(_.nonEmpty)
val hasPeersF = clientF.flatMap(_.getPeers.map(_.nonEmpty))
val hasPeersAssertF = hasPeersF.map(h => assert(h))
val isConnectedF = client.isConnected(nodeId)
val isConnectedF = clientF.flatMap(_.isConnected(nodeId))
val isConnectedAssertF =
isConnectedF.map(isConnected => assert(isConnected))
@ -573,32 +772,42 @@ class EclairRpcClientTest extends AsyncFlatSpec with BeforeAndAfterAll {
}
private def openAndConfirmChannel(
client1: EclairRpcClient,
client2: EclairRpcClient,
client1F: Future[EclairRpcClient],
client2F: Future[EclairRpcClient],
amount: CurrencyUnit = Satoshis(Int64(1000000))): Future[ChannelId] = {
val bitcoindRpc = EclairRpcTestUtil.getBitcoindRpc(client1)
val bitcoindRpcF = client1F.map(EclairRpcTestUtil.getBitcoindRpc(_))
val nodeId2F: Future[NodeId] = client2.getInfo.map(_.nodeId)
val nodeId2F: Future[NodeId] = client2F.flatMap(_.getInfo.map(_.nodeId))
val channelIdF: Future[ChannelId] =
nodeId2F.flatMap(nid2 => client1.open(nid2, amount))
val channelIdF: Future[ChannelId] = {
nodeId2F.flatMap { nid2 =>
client1F.flatMap(_.open(nid2, amount))
}
}
//confirm the funding tx
val genF = channelIdF.flatMap(_ => bitcoindRpc.generate(6))
val genF = channelIdF.flatMap { _ =>
bitcoindRpcF.flatMap(_.generate(6))
}
channelIdF.flatMap { cid =>
genF.map { _ =>
genF.flatMap { _ =>
//wait until our peer has put the channel in the
//NORMAL state so we can route payments to them
EclairRpcTestUtil.awaitUntilChannelNormal(client2, cid)
val normalF = client2F.flatMap(c2 => EclairRpcTestUtil.awaitUntilChannelNormal(c2, cid))
cid
normalF.map(_ => cid)
}
}
}
private def updateIsInChannels(channels: Seq[OpenChannelInfo])(
update: ChannelUpdate): Boolean = {
channels.exists(_.shortChannelId == update.shortChannelId)
}
override def afterAll(): Unit = {
clients.result().foreach(EclairRpcTestUtil.shutdown)
TestKit.shutdownActorSystem(system)

View file

@ -4,20 +4,24 @@ import akka.testkit.TestKit
import org.bitcoins.eclair.rpc.client.EclairRpcClient
import org.bitcoins.rpc.BitcoindRpcTestUtil
import org.scalatest.{AsyncFlatSpec, BeforeAndAfterAll}
import org.slf4j.LoggerFactory
class EclairRpcTestUtilTest extends AsyncFlatSpec with BeforeAndAfterAll {
private val logger = LoggerFactory.getLogger(getClass)
private implicit val actorSystem: ActorSystem =
ActorSystem.create("EclairRpcTestUtilTest")
private val bitcoindRpc = BitcoindRpcTestUtil.startedBitcoindRpcClient()
private val bitcoindRpcF = {
val cliF = BitcoindRpcTestUtil.startedBitcoindRpcClient()
val blocksF = cliF.flatMap(_.generate(200))
blocksF.flatMap(_ => cliF)
}
private val clients =
Vector.newBuilder[EclairRpcClient]
override def beforeAll: Unit = {
bitcoindRpc.generate(200)
}
override def afterAll: Unit = {
clients.result().foreach(EclairRpcTestUtil.shutdown)
TestKit.shutdownActorSystem(actorSystem)
@ -25,41 +29,22 @@ class EclairRpcTestUtilTest extends AsyncFlatSpec with BeforeAndAfterAll {
behavior of "EclairRpcTestUtilTest"
it must "spawn four nodes and create a P2P link between them" in {
val EclairNodes4(first, second, third, fourth) =
EclairRpcTestUtil.createNodeLink(bitcoindRpc)
clients ++= List(first, second, third, fourth)
for {
nodeInfoFirst <- first.getInfo
peersFirst <- first.getPeers
nodeInfoSecond <- second.getInfo
peersSecond <- second.getPeers
nodeInfoThird <- third.getInfo
peersThird <- third.getPeers
nodeInfoFourth <- fourth.getInfo
peersFourth <- fourth.getPeers
} yield {
assert(peersFirst.length == 1)
assert(peersFirst.exists(_.nodeId == nodeInfoSecond.nodeId))
assert(peersSecond.length == 2)
assert(peersSecond.exists(_.nodeId == nodeInfoFirst.nodeId))
assert(peersSecond.exists(_.nodeId == nodeInfoThird.nodeId))
assert(peersThird.length == 2)
assert(peersThird.exists(_.nodeId == nodeInfoSecond.nodeId))
assert(peersThird.exists(_.nodeId == nodeInfoFourth.nodeId))
assert(peersFourth.length == 1)
assert(peersFourth.exists(_.nodeId == nodeInfoThird.nodeId))
}
}
it must "spawn four nodes and create a channel link between them" in {
val EclairNodes4(first, second, third, fourth) =
EclairRpcTestUtil.createNodeLink(bitcoindRpc)
clients ++= List(first, second, third, fourth)
val nodes4F = bitcoindRpcF.flatMap { bitcoindRpc =>
val nodes = EclairRpcTestUtil.createNodeLink(bitcoindRpc)
nodes.map { n4 =>
clients ++= List(n4.c1, n4.c2, n4.c3, n4.c4)
}
nodes
}
nodes4F.flatMap { n4 =>
val first = n4.c1
val second = n4.c2
val third = n4.c3
val fourth = n4.c4
for {
nodeInfoFirst <- first.getInfo
@ -86,4 +71,6 @@ class EclairRpcTestUtilTest extends AsyncFlatSpec with BeforeAndAfterAll {
assert(channelsFourth.exists(_.nodeId == nodeInfoThird.nodeId))
}
}
}
}

View file

@ -17,7 +17,7 @@ object Deps {
val nativeLoaderV = "2.3.2"
val typesafeConfigV = "1.3.3"
val bitcoinsV = "0.0.4.1-SNAPSHOT"
val bitcoinsV = "236041-1549541584036-SNAPSHOT"
}
object Compile {

View file

@ -14,24 +14,20 @@ import org.bitcoins.core.currency.{Bitcoins, Satoshis}
import org.bitcoins.core.number.UInt32
import org.bitcoins.core.protocol.blockchain.{Block, BlockHeader, MerkleBlock}
import org.bitcoins.core.protocol.script.ScriptPubKey
import org.bitcoins.core.protocol.transaction.{
Transaction,
TransactionInput,
TransactionOutPoint
}
import org.bitcoins.core.protocol.transaction.{Transaction, TransactionInput, TransactionOutPoint}
import org.bitcoins.core.protocol.{BitcoinAddress, P2PKHAddress}
import org.bitcoins.core.util.{BitcoinSLogger, BitcoinSUtil}
import org.bitcoins.rpc.client.RpcOpts.AddressType
import org.bitcoins.rpc.config.BitcoindInstance
import org.bitcoins.rpc.jsonmodels._
import org.bitcoins.rpc.serializers.JsonSerializers._
import org.bitcoins.rpc.util.RpcUtil
import org.bitcoins.rpc.util.{AsyncUtil, RpcUtil}
import play.api.libs.json._
import scala.concurrent.duration.DurationInt
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.{ExecutionContext, Future, Promise}
import scala.sys.process._
import scala.util.Try
import scala.util.{Failure, Success, Try}
class BitcoindRpcClient(val instance: BitcoindInstance)(
implicit
@ -1108,8 +1104,10 @@ class BitcoindRpcClient(val instance: BitcoindInstance)(
val payloadF: Future[JsValue] = responseF.flatMap(getPayload)
payloadF.map { payload =>
parseResult((payload \ resultKey).validate[T], payload)
payloadF.flatMap { payload =>
val jsResult = (payload \ resultKey).validate[T]
val result = parseResult(jsResult, payload)
Future.fromTry(result)
}
}
@ -1117,21 +1115,21 @@ class BitcoindRpcClient(val instance: BitcoindInstance)(
implicit val rpcErrorReads: Reads[RpcError] = Json.reads[RpcError]
// Should both logging and throwing be happening?
private def parseResult[T](result: JsResult[T], json: JsValue): T = {
private def parseResult[T](result: JsResult[T], json: JsValue): Try[T] = {
checkUnitError[T](result, json)
result match {
case res: JsSuccess[T] => res.value
case res: JsSuccess[T] => Success(res.value)
case res: JsError =>
(json \ errorKey).validate[RpcError] match {
case err: JsSuccess[RpcError] =>
logger.error(s"Error ${err.value.code}: ${err.value.message}")
throw new RuntimeException(
s"Error ${err.value.code}: ${err.value.message}")
Failure(new RuntimeException(
s"Error ${err.value.code}: ${err.value.message}"))
case _: JsError =>
logger.error(JsError.toJson(res).toString())
throw new IllegalArgumentException(
s"Could not parse JsResult: ${(json \ resultKey).get}")
Failure(new IllegalArgumentException(
s"Could not parse JsResult: ${(json \ resultKey).get}"))
}
}
}
@ -1190,13 +1188,37 @@ class BitcoindRpcClient(val instance: BitcoindInstance)(
HttpCredentials.createBasicHttpCredentials(username, password))
}
def start(): String = {
/** Starts bitcoind on the local system.
* @return a future that completes when bitcoind is fully started.
* This future times out after 60 seconds if the client
* cannot be started
*/
def start(): Future[Unit] = {
val cmd = List("bitcoind",
"-datadir=" + instance.authCredentials.datadir,
"-rpcport=" + instance.rpcUri.getPort,
"-port=" + instance.uri.getPort)
logger.debug(s"starting bitcoind")
val _ = Process(cmd).run()
"Started bitcoind!"
def isStartedF(): Future[Boolean] = {
val started: Promise[Boolean] = Promise()
getBlockCount.onComplete {
case Success(_) => started.success(true)
case Failure(_) => started.success(false)
}
started.future
}
val started = AsyncUtil.retryUntilSatisfiedF(
() => isStartedF,
duration = 1.seconds,
maxTries = 60)
started.map(_ => logger.debug(s"started bitcoind"))
started
}
}

View file

@ -144,22 +144,27 @@ class BitcoindRpcClientTest
val servers = Vector(walletClient, client, otherClient, pruneClient)
logger.info("Bitcoin servers starting")
BitcoindRpcTestUtil.startServers(servers)
val startedF = BitcoindRpcTestUtil.startServers(servers)
val _ = startedF.flatMap { _ =>
logger.info(s"Adding node...")
client.addNode(otherClient.getDaemon.uri, "add")
}
Await.result(
walletClient.encryptWallet(password).map { msg =>
val encryptedStartAndShutdown = {
startedF.flatMap { _ =>
walletClient.encryptWallet(password).flatMap { msg =>
logger.info(msg)
RpcUtil.awaitServerShutdown(walletClient)
logger.debug(walletClient.isStarted.toString)
// Very rarely, this may fail if bitcoind does not ping but hasn't yet released its locks
walletClient.start()
logger.info("Bitcoin server restarting")
RpcUtil.awaitServer(walletClient)
},
30.seconds
)
}
}
}
Await.result(encryptedStartAndShutdown, 30.seconds)
logger.info("Mining some blocks")
Await.result(client.generate(200), 30.seconds)

View file

@ -111,15 +111,12 @@ class RpcUtilTest extends AsyncFlatSpec with BeforeAndAfterAll {
val instance = BitcoindRpcTestUtil.instance()
val client = new BitcoindRpcClient(instance)
client.start()
RpcUtil.awaitCondition(client.isStarted)
assert(client.isStarted)
client.stop()
RpcUtil.awaitServerShutdown(client)
assert(!client.isStarted)
val startedF = client.start()
val t = Try(Await.result(client.getNetworkInfo, 1000.milliseconds))
assert(t.isFailure)
startedF.map { _ =>
client.stop()
succeed
}
}
it should "be able to create a connected node pair with 100 blocks and then delete them" in {

View file

@ -7,11 +7,7 @@ import akka.actor.ActorSystem
import com.typesafe.config.{Config, ConfigFactory}
import org.bitcoins.core.config.RegTest
import org.bitcoins.core.currency.CurrencyUnit
import org.bitcoins.core.protocol.ln.channel.{
ChannelId,
ChannelState,
FundedChannelId
}
import org.bitcoins.core.protocol.ln.channel.{ChannelId, ChannelState, FundedChannelId}
import org.bitcoins.core.protocol.ln.currency.MilliSatoshis
import org.bitcoins.core.util.BitcoinSLogger
import org.bitcoins.eclair.rpc.client.EclairRpcClient
@ -21,9 +17,10 @@ import org.bitcoins.rpc.BitcoindRpcTestUtil
import org.bitcoins.rpc.client.BitcoindRpcClient
import org.bitcoins.rpc.config.{BitcoindInstance, ZmqConfig}
import org.bitcoins.rpc.util.RpcUtil
import org.bitcoins.util.AsyncUtil
import scala.concurrent.duration.DurationInt
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.concurrent.{ExecutionContext, Future}
/**
* @define nodeLinkDoc
@ -147,22 +144,22 @@ trait EclairRpcTestUtil extends BitcoinSLogger {
}
def randomEclairClient(bitcoindRpcOpt: Option[BitcoindRpcClient] = None)(
implicit system: ActorSystem): EclairRpcClient = {
val bitcoindRpc = {
implicit system: ActorSystem): Future[EclairRpcClient] = {
import system.dispatcher
val bitcoindRpcF: Future[BitcoindRpcClient] = {
if (bitcoindRpcOpt.isDefined) {
bitcoindRpcOpt.get
Future.successful(bitcoindRpcOpt.get)
} else {
BitcoindRpcTestUtil.startedBitcoindRpcClient()
}
}
val randInstance = randomEclairInstance(bitcoindRpc)
val eclairRpc = new EclairRpcClient(randInstance)
eclairRpc.start()
val randInstanceF = bitcoindRpcF.map(randomEclairInstance(_))
val eclairRpcF = randInstanceF.map(i => new EclairRpcClient(i))
RpcUtil.awaitCondition(() => eclairRpc.isStarted(), duration = 1.seconds)
val startedF = eclairRpcF.flatMap(_.start())
eclairRpc
startedF.flatMap(_ => eclairRpcF)
}
def cannonicalEclairClient()(
@ -195,19 +192,19 @@ trait EclairRpcTestUtil extends BitcoinSLogger {
* @param chanId
*/
def awaitUntilChannelNormal(client: EclairRpcClient, chanId: ChannelId)(
implicit system: ActorSystem): Unit = {
implicit system: ActorSystem): Future[Unit] = {
awaitUntilChannelState(client, chanId, ChannelState.NORMAL)
}
def awaitUntilChannelClosing(client: EclairRpcClient, chanId: ChannelId)(
implicit system: ActorSystem): Unit = {
implicit system: ActorSystem): Future[Unit] = {
awaitUntilChannelState(client, chanId, ChannelState.CLOSING)
}
private def awaitUntilChannelState(
client: EclairRpcClient,
chanId: ChannelId,
state: ChannelState)(implicit system: ActorSystem): Unit = {
state: ChannelState)(implicit system: ActorSystem): Future[Unit] = {
logger.debug(s"Awaiting ${chanId} to enter ${state} state")
def isState(): Future[Boolean] = {
val chanF = client.channel(chanId)
@ -220,23 +217,29 @@ trait EclairRpcTestUtil extends BitcoinSLogger {
}(system.dispatcher)
}
RpcUtil.awaitConditionF(conditionF = () => isState(), duration = 1.seconds)
logger.debug(s"${chanId} has successfully entered the ${state} state")
()
AsyncUtil.retryUntilSatisfiedF(conditionF = () => isState(), duration = 1.seconds)
}
private def createNodeLink(
bitcoindRpcClient: Option[BitcoindRpcClient],
channelAmount: MilliSatoshis)(
implicit actorSystem: ActorSystem): EclairNodes4 = {
implicit actorSystem: ActorSystem): Future[EclairNodes4] = {
implicit val ec: ExecutionContext = actorSystem.dispatcher
val internalBitcoind = bitcoindRpcClient.getOrElse(
val internalBitcoindF = {
if (bitcoindRpcClient.isDefined) {
Future.successful(bitcoindRpcClient.get)
} else {
BitcoindRpcTestUtil.startedBitcoindRpcClient()
)
}
}
val (first, second) = createNodePair(Some(internalBitcoind))
val (third, fourth) = createNodePair(Some(internalBitcoind))
val pair1: Future[(EclairRpcClient, EclairRpcClient)] = {
internalBitcoindF.flatMap(b => createNodePair(Some(b)))
}
val pair2: Future[(EclairRpcClient,EclairRpcClient)] = {
internalBitcoindF.flatMap(b => createNodePair(Some(b)))
}
def open(
c1: EclairRpcClient,
@ -247,16 +250,45 @@ trait EclairRpcTestUtil extends BitcoinSLogger {
pushMSat = MilliSatoshis(channelAmount.toLong / 2))
}
EclairRpcTestUtil.connectLNNodes(second, third)
val nodeVecF: Future[Vector[EclairRpcClient]] = {
pair1.flatMap { case (first,second) =>
pair2.flatMap {
case (third,fourth) =>
val openF = Future.sequence(
List(open(first, second), open(second, third), open(third, fourth)))
// we need to make sure the second and third nodes are connected
val connected = EclairRpcTestUtil.connectLNNodes(second, third)
connected.map { _ =>
Vector(first,second,third,fourth)
}
Await.result(openF, 60.seconds)
}
}
}
internalBitcoind.generate(3)
EclairNodes4(first, second, third, fourth)
val openChannelsFNested: Future[List[Future[FundedChannelId]]] = {
nodeVecF.map { nodeVec =>
List(open(nodeVec.head, nodeVec(1)),
open(nodeVec(1), nodeVec(2)),
open(nodeVec(2), nodeVec(3)))
}
}
val openChannelsF :Future[List[FundedChannelId]] = {
openChannelsFNested.flatMap(Future.sequence(_))
}
val genBlocksF = openChannelsF.flatMap { _ =>
internalBitcoindF.flatMap(_.generate(3))
}
genBlocksF.flatMap { _ =>
nodeVecF.map { nodeVec =>
EclairNodes4(nodeVec.head, nodeVec(1), nodeVec(2), nodeVec(3))
}
}
}
/**
@ -267,7 +299,7 @@ trait EclairRpcTestUtil extends BitcoinSLogger {
*/
def createNodeLink(
bitcoindRpcClient: BitcoindRpcClient
)(implicit actorSystem: ActorSystem): EclairNodes4 = {
)(implicit actorSystem: ActorSystem): Future[EclairNodes4] = {
createNodeLink(Some(bitcoindRpcClient), DEFAULT_CHANNEL_MSAT_AMT)
}
@ -280,7 +312,7 @@ trait EclairRpcTestUtil extends BitcoinSLogger {
def createNodeLink(
bitcoindRpcClient: BitcoindRpcClient,
channelAmount: MilliSatoshis)(
implicit actorSystem: ActorSystem): EclairNodes4 = {
implicit actorSystem: ActorSystem): Future[EclairNodes4] = {
createNodeLink(Some(bitcoindRpcClient), channelAmount)
}
@ -290,7 +322,7 @@ trait EclairRpcTestUtil extends BitcoinSLogger {
* @return A 4-tuple of the created nodes' respective
* [[org.bitcoins.eclair.rpc.client.EclairRpcClient EclairRpcClient]]
*/
def createNodeLink()(implicit actorSystem: ActorSystem): EclairNodes4 = {
def createNodeLink()(implicit actorSystem: ActorSystem): Future[EclairNodes4] = {
createNodeLink(None, DEFAULT_CHANNEL_MSAT_AMT)
}
@ -302,7 +334,7 @@ trait EclairRpcTestUtil extends BitcoinSLogger {
*/
def createNodeLink(
channelAmount: MilliSatoshis
)(implicit actorSystem: ActorSystem): EclairNodes4 = {
)(implicit actorSystem: ActorSystem): Future[EclairNodes4] = {
createNodeLink(None, channelAmount)
}
@ -311,42 +343,50 @@ trait EclairRpcTestUtil extends BitcoinSLogger {
* respective [[org.bitcoins.eclair.rpc.client.EclairRpcClient EclairRpcClient]]s
*/
def createNodePair(bitcoindRpcClientOpt: Option[BitcoindRpcClient])(
implicit system: ActorSystem): (EclairRpcClient, EclairRpcClient) = {
val bitcoindRpcClient = {
bitcoindRpcClientOpt.getOrElse(
BitcoindRpcTestUtil.startedBitcoindRpcClient())
implicit system: ActorSystem): Future[(EclairRpcClient, EclairRpcClient)] = {
import system.dispatcher
val bitcoindRpcClientF: Future[BitcoindRpcClient] = {
if (bitcoindRpcClientOpt.isDefined) {
Future.successful(bitcoindRpcClientOpt.get)
} else {
BitcoindRpcTestUtil.startedBitcoindRpcClient()
}
}
val e1Instance = EclairRpcTestUtil.eclairInstance(bitcoindRpcClient)
val e2Instance = EclairRpcTestUtil.eclairInstance(bitcoindRpcClient)
val client = new EclairRpcClient(e1Instance)
val otherClient = new EclairRpcClient(e2Instance)
val e1InstanceF = bitcoindRpcClientF.map(EclairRpcTestUtil.eclairInstance(_))
val e2InstanceF = bitcoindRpcClientF.map(EclairRpcTestUtil.eclairInstance(_))
val clientF = e1InstanceF.flatMap { e1 =>
val e = new EclairRpcClient(e1)
logger.debug(
s"Temp eclair directory created ${client.getDaemon.authCredentials.datadir}")
s"Temp eclair directory created ${e.getDaemon.authCredentials.datadir}")
e.start().map(_ => e)
}
val otherClientF = e2InstanceF.flatMap { e2 =>
val e = new EclairRpcClient(e2)
logger.debug(
s"Temp eclair directory created ${otherClient.getDaemon.authCredentials.datadir}")
client.start()
otherClient.start()
RpcUtil.awaitCondition(condition = () => client.isStarted(),
duration = 1.second)
RpcUtil.awaitCondition(condition = () => otherClient.isStarted(),
duration = 1.second)
s"Temp eclair directory created ${e.getDaemon.authCredentials.datadir}")
e.start().map(_ => e)
}
logger.debug(s"Both clients started")
connectLNNodes(client, otherClient)
val connectedLnF: Future[(EclairRpcClient, EclairRpcClient)] = clientF.flatMap { c1 =>
otherClientF.flatMap { c2 =>
val connectedF = connectLNNodes(c1, c2)
connectedF.map { _ =>
(c1,c2)
}
}
}
(client, otherClient)
connectedLnF
}
def connectLNNodes(client: EclairRpcClient, otherClient: EclairRpcClient)(
implicit
system: ActorSystem): Unit = {
system: ActorSystem): Future[Unit] = {
implicit val dispatcher = system.dispatcher
val infoF = otherClient.getInfo
@ -365,11 +405,14 @@ trait EclairRpcTestUtil extends BitcoinSLogger {
}
logger.debug(s"Awaiting connection between clients")
RpcUtil.awaitConditionF(conditionF = () => isConnected(),
val connected = RpcUtil.retryUntilSatisfiedF(
conditionF = () => isConnected(),
duration = 1.second)
logger.debug(s"Successfully connected two clients")
()
connected.map(_ => logger.debug(s"Successfully connected two clients"))
connected
}
/**
@ -425,9 +468,9 @@ trait EclairRpcTestUtil extends BitcoinSLogger {
val opened = {
gen.flatMap { _ =>
fundedChannelIdF.map { fcid =>
awaitChannelOpened(n1, fcid)
fcid
fundedChannelIdF.flatMap { fcid =>
val chanOpenF = awaitChannelOpened(n1, fcid)
chanOpenF.map(_ => fcid)
}
}
}
@ -440,7 +483,7 @@ trait EclairRpcTestUtil extends BitcoinSLogger {
}
def awaitChannelOpened(client1: EclairRpcClient, chanId: ChannelId)(
implicit system: ActorSystem): Unit = {
implicit system: ActorSystem): Future[Unit] = {
EclairRpcTestUtil.awaitUntilChannelNormal(client1, chanId)
}
@ -464,14 +507,13 @@ trait EclairRpcTestUtil extends BitcoinSLogger {
/** Shuts down an eclair daemon and the bitcoind daemon it is associated with */
def shutdown(eclairRpcClient: EclairRpcClient)(
implicit system: ActorSystem): Unit = {
implicit system: ActorSystem): Future[Unit] = {
import system.dispatcher
val bitcoindRpc = getBitcoindRpc(eclairRpcClient)
eclairRpcClient.stop()
bitcoindRpc.stop()
RpcUtil.awaitServerShutdown(bitcoindRpc)
bitcoindRpc.stop().map(_ => ())
}
}

View file

@ -10,15 +10,10 @@ import org.bitcoins.core.config.RegTest
import org.bitcoins.core.crypto.DoubleSha256Digest
import org.bitcoins.core.util.BitcoinSLogger
import org.bitcoins.rpc.client.BitcoindRpcClient
import org.bitcoins.rpc.config.{
BitcoindAuthCredentials,
BitcoindInstance,
ZmqConfig
}
import org.bitcoins.rpc.util.RpcUtil
import org.bitcoins.rpc.config.{BitcoindAuthCredentials, BitcoindInstance, ZmqConfig}
import org.bitcoins.rpc.util.{AsyncUtil, RpcUtil}
import scala.collection.immutable.Map
import scala.concurrent.duration.{DurationInt, FiniteDuration}
import scala.concurrent.{ExecutionContext, Future, Promise}
import scala.util.{Failure, Success, Try}
@ -118,9 +113,10 @@ trait BitcoindRpcTestUtil extends BitcoinSLogger {
}
def startServers(servers: Vector[BitcoindRpcClient])(
implicit system: ActorSystem): Unit = {
servers.foreach(_.start())
servers.foreach(RpcUtil.awaitServer(_))
implicit ec: ExecutionContext): Future[Unit] = {
val startedServers = servers.map(_.start())
Future.sequence(startedServers).map(_ => ())
}
def deleteTmpDir(dir: File): Boolean = {
@ -288,26 +284,34 @@ trait BitcoindRpcTestUtil extends BitcoinSLogger {
def startedBitcoindRpcClient(
instance: BitcoindInstance = BitcoindRpcTestUtil.instance())(
implicit system: ActorSystem): BitcoindRpcClient = {
implicit system: ActorSystem): Future[BitcoindRpcClient] = {
implicit val ec = system.dispatcher
//start the bitcoind instance so eclair can properly use it
val rpc = new BitcoindRpcClient(instance)(system)
rpc.start()
logger.debug(s"Starting bitcoind at ${instance.authCredentials.datadir}")
RpcUtil.awaitServer(rpc)
val startedF = rpc.start()
val blocksToGenerate = 102
//fund the wallet by generating 102 blocks, need this to get over coinbase maturity
val _ = rpc.generate(blocksToGenerate)
def isBlocksGenerated(): Future[Boolean] = {
rpc.getBlockCount.map(_ >= blocksToGenerate)
val generatedF = startedF.flatMap { _ =>
rpc.generate(blocksToGenerate)
}
RpcUtil.awaitConditionF(() => isBlocksGenerated())
def isBlocksGenerated(): Future[Boolean] = {
rpc.getBlockCount.map { count =>
count >= blocksToGenerate
}
}
rpc
val blocksGeneratedF = generatedF.flatMap { _ =>
AsyncUtil.retryUntilSatisfiedF(
() => isBlocksGenerated,
duration = 1.seconds
)
}
val result = blocksGeneratedF.map(_ => rpc)
result
}
}