mirror of
https://github.com/ACINQ/eclair.git
synced 2025-01-19 05:33:59 +01:00
Batch requests to bitcoind json-rpc api (#429)
* added a batching client for bitcoind jsonrpc api * convert json-rpc errors to exceptions in batching client
This commit is contained in:
parent
b75962a57f
commit
32d634c7f7
@ -12,7 +12,7 @@ import com.typesafe.config.{Config, ConfigFactory}
|
||||
import fr.acinq.bitcoin.{BinaryData, Block}
|
||||
import fr.acinq.eclair.NodeParams.{BITCOIND, BITCOINJ, ELECTRUM}
|
||||
import fr.acinq.eclair.api.{GetInfoResponse, Service}
|
||||
import fr.acinq.eclair.blockchain.bitcoind.rpc.{BitcoinJsonRPCClient, ExtendedBitcoinClient}
|
||||
import fr.acinq.eclair.blockchain.bitcoind.rpc.{BasicBitcoinJsonRPCClient, BatchingBitcoinJsonRPCClient, ExtendedBitcoinClient}
|
||||
import fr.acinq.eclair.blockchain.bitcoind.zmq.ZMQActor
|
||||
import fr.acinq.eclair.blockchain.bitcoind.{BitcoinCoreWallet, ZmqWatcher}
|
||||
import fr.acinq.eclair.blockchain.bitcoinj.{BitcoinjKit, BitcoinjWallet, BitcoinjWatcher}
|
||||
@ -67,11 +67,11 @@ class Setup(datadir: File, overrideDefaults: Config = ConfigFactory.empty(), act
|
||||
|
||||
val bitcoin = nodeParams.watcherType match {
|
||||
case BITCOIND =>
|
||||
val bitcoinClient = new ExtendedBitcoinClient(new BitcoinJsonRPCClient(
|
||||
val bitcoinClient = new ExtendedBitcoinClient(new BatchingBitcoinJsonRPCClient(new BasicBitcoinJsonRPCClient(
|
||||
user = config.getString("bitcoind.rpcuser"),
|
||||
password = config.getString("bitcoind.rpcpassword"),
|
||||
host = config.getString("bitcoind.host"),
|
||||
port = config.getInt("bitcoind.rpcport")))
|
||||
port = config.getInt("bitcoind.rpcport"))))
|
||||
val future = for {
|
||||
json <- bitcoinClient.rpcClient.invoke("getblockchaininfo").recover { case _ => throw BitcoinRPCConnectionException }
|
||||
// Make sure wallet support is enabled in bitcoind.
|
||||
|
@ -0,0 +1,68 @@
|
||||
package fr.acinq.eclair.blockchain.bitcoind.rpc
|
||||
|
||||
import akka.actor.ActorSystem
|
||||
import akka.event.Logging
|
||||
import akka.http.scaladsl.Http
|
||||
import akka.http.scaladsl.marshalling.Marshal
|
||||
import akka.http.scaladsl.model._
|
||||
import akka.http.scaladsl.model.headers.{Authorization, BasicHttpCredentials}
|
||||
import akka.http.scaladsl.unmarshalling.Unmarshal
|
||||
import akka.stream.scaladsl.{Keep, Sink, Source}
|
||||
import akka.stream.{ActorMaterializer, OverflowStrategy, QueueOfferResult}
|
||||
import de.heikoseeberger.akkahttpjson4s.Json4sSupport._
|
||||
import org.json4s.JsonAST.JValue
|
||||
import org.json4s.{DefaultFormats, jackson}
|
||||
|
||||
import scala.concurrent.ExecutionContext.Implicits.global
|
||||
import scala.concurrent.{ExecutionContext, Future, Promise}
|
||||
import scala.util.{Failure, Success}
|
||||
|
||||
class BasicBitcoinJsonRPCClient(user: String, password: String, host: String = "127.0.0.1", port: Int = 8332, ssl: Boolean = false)(implicit system: ActorSystem) extends BitcoinJsonRPCClient {
|
||||
|
||||
val scheme = if (ssl) "https" else "http"
|
||||
val uri = Uri(s"$scheme://$host:$port")
|
||||
implicit val serialization = jackson.Serialization
|
||||
implicit val formats = DefaultFormats
|
||||
val log = Logging(system, classOf[BasicBitcoinJsonRPCClient])
|
||||
|
||||
implicit val materializer = ActorMaterializer()
|
||||
val httpClientFlow = Http().cachedHostConnectionPool[Promise[HttpResponse]](host, port)
|
||||
|
||||
val queueSize = 256
|
||||
val queue = Source.queue[(HttpRequest, Promise[HttpResponse])](queueSize, OverflowStrategy.dropNew)
|
||||
.via(httpClientFlow)
|
||||
.toMat(Sink.foreach({
|
||||
case ((Success(resp), p)) => p.success(resp)
|
||||
case ((Failure(e), p)) => p.failure(e)
|
||||
}))(Keep.left)
|
||||
.run()
|
||||
|
||||
def queueRequest(request: HttpRequest): Future[HttpResponse] = {
|
||||
val responsePromise = Promise[HttpResponse]()
|
||||
queue.offer(request -> responsePromise).flatMap {
|
||||
case QueueOfferResult.Enqueued => responsePromise.future
|
||||
case QueueOfferResult.Dropped => Future.failed(new RuntimeException("Queue overflowed. Try again later."))
|
||||
case QueueOfferResult.Failure(ex) => Future.failed(ex)
|
||||
case QueueOfferResult.QueueClosed => Future.failed(new RuntimeException("Queue was closed (pool shut down) while running the request. Try again later."))
|
||||
}
|
||||
}
|
||||
|
||||
override def invoke(method: String, params: Any*)(implicit ec: ExecutionContext): Future[JValue] =
|
||||
invoke(Seq(JsonRPCRequest(method = method, params = params))).map(l => jsonResponse2Exception(l.head).result)
|
||||
|
||||
def jsonResponse2Exception(jsonRPCResponse: JsonRPCResponse): JsonRPCResponse = jsonRPCResponse match {
|
||||
case JsonRPCResponse(_, Some(error), _) => throw JsonRPCError(error)
|
||||
case o => o
|
||||
}
|
||||
|
||||
def invoke(requests: Seq[JsonRPCRequest])(implicit ec: ExecutionContext): Future[Seq[JsonRPCResponse]] =
|
||||
for {
|
||||
entity <- Marshal(requests).to[RequestEntity]
|
||||
_ = log.debug("sending rpc request with body={}", entity)
|
||||
httpRes <- queueRequest(HttpRequest(uri = "/", method = HttpMethods.POST).addHeader(Authorization(BasicHttpCredentials(user, password))).withEntity(entity))
|
||||
jsonRpcRes <- Unmarshal(httpRes).to[Seq[JsonRPCResponse]].recover {
|
||||
case t: Throwable if httpRes.status == StatusCodes.Unauthorized => throw new RuntimeException("bitcoind replied with 401/Unauthorized (bad user/password?)", t)
|
||||
}
|
||||
} yield jsonRpcRes
|
||||
|
||||
}
|
@ -0,0 +1,19 @@
|
||||
package fr.acinq.eclair.blockchain.bitcoind.rpc
|
||||
|
||||
import akka.actor.{ActorSystem, Props}
|
||||
import akka.pattern.ask
|
||||
import akka.util.Timeout
|
||||
import org.json4s.JsonAST
|
||||
|
||||
import scala.concurrent.duration._
|
||||
import scala.concurrent.{ExecutionContext, Future}
|
||||
|
||||
class BatchingBitcoinJsonRPCClient(rpcClient: BasicBitcoinJsonRPCClient)(implicit system: ActorSystem, ec: ExecutionContext) extends BitcoinJsonRPCClient {
|
||||
|
||||
implicit val timeout = Timeout(1 hour)
|
||||
|
||||
val batchingClient = system.actorOf(Props(new BatchingClient(rpcClient)), name = "batching-client")
|
||||
|
||||
override def invoke(method: String, params: Any*)(implicit ec: ExecutionContext): Future[JsonAST.JValue] =
|
||||
(batchingClient ? JsonRPCRequest(method = method, params = params)).mapTo[JsonAST.JValue]
|
||||
}
|
@ -0,0 +1,62 @@
|
||||
package fr.acinq.eclair.blockchain.bitcoind.rpc
|
||||
|
||||
import akka.actor.{Actor, ActorLogging, ActorRef, Status}
|
||||
import akka.pattern.pipe
|
||||
import fr.acinq.eclair.blockchain.bitcoind.rpc.BatchingClient.Pending
|
||||
|
||||
import scala.collection.immutable.Queue
|
||||
|
||||
class BatchingClient(rpcClient: BasicBitcoinJsonRPCClient) extends Actor with ActorLogging {
|
||||
|
||||
import scala.concurrent.ExecutionContext.Implicits.global
|
||||
|
||||
override def receive: Receive = {
|
||||
case request: JsonRPCRequest =>
|
||||
// immediately process isolated request
|
||||
process(queue = Queue(Pending(request, sender)))
|
||||
}
|
||||
|
||||
def waiting(queue: Queue[Pending], processing: Seq[Pending]): Receive = {
|
||||
case request: JsonRPCRequest =>
|
||||
// there is already a batch in flight, just add this request to the queue
|
||||
context become waiting(queue :+ Pending(request, sender), processing)
|
||||
|
||||
case responses: Seq[JsonRPCResponse]@unchecked =>
|
||||
log.debug(s"got ${responses.size} responses")
|
||||
// let's send back answers to the requestors
|
||||
require(responses.size == processing.size, s"responses=${responses.size} != processing=${processing.size}")
|
||||
responses.zip(processing).foreach {
|
||||
case (JsonRPCResponse(result, None, _), Pending(_, requestor)) => requestor ! result
|
||||
case (JsonRPCResponse(_, Some(error), _), Pending(_, requestor)) => requestor ! Status.Failure(JsonRPCError(error))
|
||||
}
|
||||
process(queue)
|
||||
|
||||
case s@Status.Failure(t) =>
|
||||
log.error(t, s"got exception for batch of ${processing.size} requests ")
|
||||
// let's fail all requests
|
||||
processing.foreach { case Pending(_, requestor) => requestor ! s }
|
||||
process(queue)
|
||||
}
|
||||
|
||||
def process(queue: Queue[Pending]) = {
|
||||
// do we have queued requests?
|
||||
if (queue.isEmpty) {
|
||||
log.debug(s"no more requests, going back to idle")
|
||||
context become receive
|
||||
} else {
|
||||
val (batch, rest) = queue.splitAt(BatchingClient.BATCH_SIZE)
|
||||
log.debug(s"sending ${batch.size} requests (queue=${queue.size})")
|
||||
rpcClient.invoke(batch.map(_.request)) pipeTo self
|
||||
context become waiting(rest, batch)
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
object BatchingClient {
|
||||
|
||||
val BATCH_SIZE = 50
|
||||
|
||||
case class Pending(request: JsonRPCRequest, requestor: ActorRef)
|
||||
|
||||
}
|
@ -2,84 +2,19 @@ package fr.acinq.eclair.blockchain.bitcoind.rpc
|
||||
|
||||
import java.io.IOException
|
||||
|
||||
import akka.actor.ActorSystem
|
||||
import akka.event.Logging
|
||||
import akka.http.scaladsl.Http
|
||||
import akka.http.scaladsl.marshalling.Marshal
|
||||
import akka.http.scaladsl.model._
|
||||
import akka.http.scaladsl.model.headers.{Authorization, BasicHttpCredentials}
|
||||
import akka.http.scaladsl.unmarshalling.Unmarshal
|
||||
import akka.stream.{ActorMaterializer, OverflowStrategy, QueueOfferResult}
|
||||
import akka.stream.scaladsl.{Keep, Sink, Source}
|
||||
import de.heikoseeberger.akkahttpjson4s.Json4sSupport._
|
||||
import org.json4s.JsonAST.JValue
|
||||
import org.json4s.{DefaultFormats, jackson}
|
||||
|
||||
import scala.concurrent.{ExecutionContext, Future, Promise}
|
||||
import scala.concurrent.ExecutionContext.Implicits.global
|
||||
import scala.util.{Failure, Success}
|
||||
import scala.concurrent.{ExecutionContext, Future}
|
||||
|
||||
trait BitcoinJsonRPCClient {
|
||||
|
||||
def invoke(method: String, params: Any*)(implicit ec: ExecutionContext): Future[JValue]
|
||||
|
||||
}
|
||||
|
||||
// @formatter:off
|
||||
case class JsonRPCRequest(jsonrpc: String = "1.0", id: String = "scala-client", method: String, params: Seq[Any])
|
||||
case class Error(code: Int, message: String)
|
||||
case class JsonRPCResponse(result: JValue, error: Option[Error], id: String)
|
||||
case class JsonRPCError(error: Error) extends IOException(s"${error.message} (code: ${error.code})")
|
||||
// @formatter:on
|
||||
|
||||
class BitcoinJsonRPCClient(user: String, password: String, host: String = "127.0.0.1", port: Int = 8332, ssl: Boolean = false)(implicit system: ActorSystem) {
|
||||
|
||||
val scheme = if (ssl) "https" else "http"
|
||||
val uri = Uri(s"$scheme://$host:$port")
|
||||
implicit val serialization = jackson.Serialization
|
||||
implicit val formats = DefaultFormats
|
||||
val log = Logging(system, classOf[BitcoinJsonRPCClient])
|
||||
|
||||
implicit val materializer = ActorMaterializer()
|
||||
val httpClientFlow = Http().cachedHostConnectionPool[Promise[HttpResponse]](host, port)
|
||||
|
||||
val queueSize = 32768
|
||||
val queue = Source.queue[(HttpRequest, Promise[HttpResponse])](queueSize, OverflowStrategy.dropNew)
|
||||
.via(httpClientFlow)
|
||||
.toMat(Sink.foreach({
|
||||
case ((Success(resp), p)) => p.success(resp)
|
||||
case ((Failure(e), p)) => p.failure(e)
|
||||
}))(Keep.left)
|
||||
.run()
|
||||
|
||||
def queueRequest(request: HttpRequest): Future[HttpResponse] = {
|
||||
val responsePromise = Promise[HttpResponse]()
|
||||
queue.offer(request -> responsePromise).flatMap {
|
||||
case QueueOfferResult.Enqueued => responsePromise.future
|
||||
case QueueOfferResult.Dropped => Future.failed(new RuntimeException("Queue overflowed. Try again later."))
|
||||
case QueueOfferResult.Failure(ex) => Future.failed(ex)
|
||||
case QueueOfferResult.QueueClosed => Future.failed(new RuntimeException("Queue was closed (pool shut down) while running the request. Try again later."))
|
||||
}
|
||||
}
|
||||
|
||||
def invoke(method: String, params: Any*)(implicit ec: ExecutionContext): Future[JValue] =
|
||||
for {
|
||||
entity <- Marshal(JsonRPCRequest(method = method, params = params)).to[RequestEntity]
|
||||
_ = log.debug("sending rpc request with body={}", entity)
|
||||
httpRes <- queueRequest(HttpRequest(uri = "/", method = HttpMethods.POST).addHeader(Authorization(BasicHttpCredentials(user, password))).withEntity(entity))
|
||||
jsonRpcRes <- Unmarshal(httpRes).to[JsonRPCResponse].map {
|
||||
case JsonRPCResponse(_, Some(error), _) => throw JsonRPCError(error)
|
||||
case o => o
|
||||
} recover {
|
||||
case t: Throwable if httpRes.status == StatusCodes.Unauthorized => throw new RuntimeException("bitcoind replied with 401/Unauthorized (bad user/password?)", t)
|
||||
}
|
||||
} yield jsonRpcRes.result
|
||||
|
||||
def invoke(request: Seq[(String, Seq[Any])])(implicit ec: ExecutionContext): Future[Seq[JValue]] =
|
||||
for {
|
||||
entity <- Marshal(request.map(r => JsonRPCRequest(method = r._1, params = r._2))).to[RequestEntity]
|
||||
_ = log.debug("sending rpc request with body={}", entity)
|
||||
httpRes <- queueRequest(HttpRequest(uri = "/", method = HttpMethods.POST).addHeader(Authorization(BasicHttpCredentials(user, password))).withEntity(entity))
|
||||
jsonRpcRes <- Unmarshal(httpRes).to[Seq[JsonRPCResponse]].map {
|
||||
//case JsonRPCResponse(_, Some(error), _) => throw JsonRPCError(error)
|
||||
case o => o
|
||||
} recover {
|
||||
case t: Throwable if httpRes.status == StatusCodes.Unauthorized => throw new RuntimeException("bitcoind replied with 401/Unauthorized (bad user/password?)", t)
|
||||
}
|
||||
} yield jsonRpcRes.map(_.result)
|
||||
|
||||
}
|
||||
// @formatter:on
|
@ -99,7 +99,6 @@ class ExtendedBitcoinClient(val rpcClient: BitcoinJsonRPCClient) {
|
||||
json <- rpcClient.invoke("gettxout", txId, outputIndex, includeMempool)
|
||||
} yield json != JNull
|
||||
|
||||
|
||||
/**
|
||||
*
|
||||
* @param txId transaction id
|
||||
@ -140,36 +139,28 @@ class ExtendedBitcoinClient(val rpcClient: BitcoinJsonRPCClient) {
|
||||
case JInt(count) => count.toLong
|
||||
}
|
||||
|
||||
def getParallel(awaiting: Seq[ChannelAnnouncement]): Future[ParallelGetResponse] = {
|
||||
def get(c: ChannelAnnouncement)(implicit ec: ExecutionContext): Future[IndividualResult] = {
|
||||
case class TxCoordinate(blockHeight: Int, txIndex: Int, outputIndex: Int)
|
||||
|
||||
val coordinates = awaiting.map {
|
||||
case c =>
|
||||
val (blockHeight, txIndex, outputIndex) = fromShortId(c.shortChannelId)
|
||||
TxCoordinate(blockHeight, txIndex, outputIndex)
|
||||
}.zipWithIndex
|
||||
|
||||
import ExecutionContext.Implicits.global
|
||||
implicit val formats = org.json4s.DefaultFormats
|
||||
val (blockHeight, txIndex, outputIndex) = fromShortId(c.shortChannelId)
|
||||
val coordinates = TxCoordinate(blockHeight, txIndex, outputIndex)
|
||||
|
||||
for {
|
||||
blockHashes: Seq[String] <- rpcClient.invoke(coordinates.map(coord => ("getblockhash", coord._1.blockHeight :: Nil))).map(_.map(_.extractOrElse[String]("00" * 32)))
|
||||
txids: Seq[String] <- rpcClient.invoke(blockHashes.map(h => ("getblock", h :: Nil)))
|
||||
.map(_.zipWithIndex)
|
||||
.map(_.map {
|
||||
case (json, idx) => Try {
|
||||
val JArray(txs) = json \ "tx"
|
||||
txs(coordinates(idx)._1.txIndex).extract[String]
|
||||
} getOrElse ("00" * 32)
|
||||
})
|
||||
txs <- rpcClient.invoke(txids.map(txid => ("getrawtransaction", txid :: Nil))).map(_.map {
|
||||
case JString(raw) => Some(Transaction.read(raw))
|
||||
case _ => None
|
||||
})
|
||||
unspent <- rpcClient.invoke(txids.zipWithIndex.map(txid => ("gettxout", txid._1 :: coordinates(txid._2)._1.outputIndex :: true :: Nil))).map(_.map(_ != JNull))
|
||||
} yield ParallelGetResponse(awaiting.zip(txs.zip(unspent)).map(x => IndividualResult(x._1, x._2._1, x._2._2)))
|
||||
blockHash: String <- rpcClient.invoke("getblockhash", coordinates.blockHeight).map(_.extractOrElse[String]("00" * 32))
|
||||
txid: String <- rpcClient.invoke("getblock", blockHash).map {
|
||||
case json => Try {
|
||||
val JArray(txs) = json \ "tx"
|
||||
txs(coordinates.txIndex).extract[String]
|
||||
} getOrElse ("00" * 32)
|
||||
}
|
||||
tx <- getRawTransaction(txid)
|
||||
unspent <- isTransactionOutputSpendable(txid, coordinates.outputIndex, includeMempool = true)
|
||||
} yield IndividualResult(c, Some(Transaction.read(tx)), unspent)
|
||||
}
|
||||
|
||||
def getParallel(awaiting: Seq[ChannelAnnouncement])(implicit ec: ExecutionContext): Future[ParallelGetResponse] =
|
||||
Future.sequence(awaiting.map(get)).map(ParallelGetResponse)
|
||||
|
||||
/**
|
||||
*
|
||||
* @return the list of bitcoin addresses for which the wallet has UTXOs
|
||||
|
@ -3,7 +3,7 @@ package fr.acinq.eclair
|
||||
import akka.actor.ActorSystem
|
||||
import fr.acinq.bitcoin.{Block, Transaction}
|
||||
import fr.acinq.eclair.blockchain._
|
||||
import fr.acinq.eclair.blockchain.bitcoind.rpc.{BitcoinJsonRPCClient, ExtendedBitcoinClient}
|
||||
import fr.acinq.eclair.blockchain.bitcoind.rpc.{BasicBitcoinJsonRPCClient, ExtendedBitcoinClient}
|
||||
|
||||
import scala.concurrent.duration._
|
||||
import scala.concurrent.{ExecutionContext, Future}
|
||||
@ -11,7 +11,7 @@ import scala.concurrent.{ExecutionContext, Future}
|
||||
/**
|
||||
* Created by PM on 26/04/2016.
|
||||
*/
|
||||
class TestBitcoinClient()(implicit system: ActorSystem) extends ExtendedBitcoinClient(new BitcoinJsonRPCClient("", "", "", 0)) {
|
||||
class TestBitcoinClient()(implicit system: ActorSystem) extends ExtendedBitcoinClient(new BasicBitcoinJsonRPCClient("", "", "", 0)) {
|
||||
|
||||
import scala.concurrent.ExecutionContext.Implicits.global
|
||||
|
||||
|
@ -11,7 +11,7 @@ import com.typesafe.config.ConfigFactory
|
||||
import fr.acinq.bitcoin._
|
||||
import fr.acinq.bitcoin.Crypto.PrivateKey
|
||||
import fr.acinq.eclair.Kit
|
||||
import fr.acinq.eclair.blockchain.bitcoind.rpc.{BitcoinJsonRPCClient, ExtendedBitcoinClient}
|
||||
import fr.acinq.eclair.blockchain.bitcoind.rpc.{BasicBitcoinJsonRPCClient, BitcoinJsonRPCClient, ExtendedBitcoinClient}
|
||||
import fr.acinq.eclair.integration.IntegrationSpec
|
||||
import grizzled.slf4j.Logging
|
||||
import org.json4s.DefaultFormats
|
||||
@ -48,7 +48,7 @@ class ExtendedBitcoinClientSpec extends TestKit(ActorSystem("test")) with FunSui
|
||||
Files.copy(classOf[IntegrationSpec].getResourceAsStream("/integration/bitcoin.conf"), new File(PATH_BITCOIND_DATADIR.toString, "bitcoin.conf").toPath)
|
||||
|
||||
bitcoind = s"$PATH_BITCOIND -datadir=$PATH_BITCOIND_DATADIR".run()
|
||||
bitcoinrpcclient = new BitcoinJsonRPCClient(user = "foo", password = "bar", host = "localhost", port = 28332)
|
||||
bitcoinrpcclient = new BasicBitcoinJsonRPCClient(user = "foo", password = "bar", host = "localhost", port = 28332)
|
||||
bitcoincli = system.actorOf(Props(new Actor {
|
||||
override def receive: Receive = {
|
||||
case BitcoinReq(method) => bitcoinrpcclient.invoke(method) pipeTo sender
|
||||
|
@ -10,7 +10,7 @@ import akka.testkit.{TestKit, TestProbe}
|
||||
import com.typesafe.config.ConfigFactory
|
||||
import fr.acinq.bitcoin.{MilliBtc, Satoshi, Script}
|
||||
import fr.acinq.eclair.blockchain._
|
||||
import fr.acinq.eclair.blockchain.bitcoind.rpc.BitcoinJsonRPCClient
|
||||
import fr.acinq.eclair.blockchain.bitcoind.rpc.BasicBitcoinJsonRPCClient
|
||||
import fr.acinq.eclair.randomKey
|
||||
import fr.acinq.eclair.transactions.Scripts
|
||||
import grizzled.slf4j.Logging
|
||||
@ -35,7 +35,7 @@ class BitcoinCoreWalletSpec extends TestKit(ActorSystem("test")) with FunSuiteLi
|
||||
val PATH_BITCOIND_DATADIR = new File(INTEGRATION_TMP_DIR, "datadir-bitcoin")
|
||||
|
||||
var bitcoind: Process = null
|
||||
var bitcoinrpcclient: BitcoinJsonRPCClient = null
|
||||
var bitcoinrpcclient: BasicBitcoinJsonRPCClient = null
|
||||
var bitcoincli: ActorRef = null
|
||||
|
||||
implicit val formats = DefaultFormats
|
||||
@ -47,7 +47,7 @@ class BitcoinCoreWalletSpec extends TestKit(ActorSystem("test")) with FunSuiteLi
|
||||
Files.copy(classOf[BitcoinCoreWalletSpec].getResourceAsStream("/integration/bitcoin.conf"), new File(PATH_BITCOIND_DATADIR.toString, "bitcoin.conf").toPath)
|
||||
|
||||
bitcoind = s"$PATH_BITCOIND -datadir=$PATH_BITCOIND_DATADIR".run()
|
||||
bitcoinrpcclient = new BitcoinJsonRPCClient(user = "foo", password = "bar", host = "localhost", port = 28332)
|
||||
bitcoinrpcclient = new BasicBitcoinJsonRPCClient(user = "foo", password = "bar", host = "localhost", port = 28332)
|
||||
bitcoincli = system.actorOf(Props(new Actor {
|
||||
override def receive: Receive = {
|
||||
case BitcoinReq(method) => bitcoinrpcclient.invoke(method) pipeTo sender
|
||||
@ -85,7 +85,7 @@ class BitcoinCoreWalletSpec extends TestKit(ActorSystem("test")) with FunSuiteLi
|
||||
import collection.JavaConversions._
|
||||
val commonConfig = ConfigFactory.parseMap(Map("eclair.chain" -> "regtest", "eclair.spv" -> false, "eclair.server.public-ips.1" -> "localhost", "eclair.bitcoind.port" -> 28333, "eclair.bitcoind.rpcport" -> 28332, "eclair.bitcoind.zmq" -> "tcp://127.0.0.1:28334", "eclair.router-broadcast-interval" -> "2 second", "eclair.auto-reconnect" -> false))
|
||||
val config = ConfigFactory.load(commonConfig).getConfig("eclair")
|
||||
val bitcoinClient = new BitcoinJsonRPCClient(
|
||||
val bitcoinClient = new BasicBitcoinJsonRPCClient(
|
||||
user = config.getString("bitcoind.rpcuser"),
|
||||
password = config.getString("bitcoind.rpcpassword"),
|
||||
host = config.getString("bitcoind.host"),
|
||||
|
@ -9,7 +9,7 @@ import akka.actor.{Actor, ActorRef, ActorSystem, Props}
|
||||
import akka.pattern.pipe
|
||||
import akka.testkit.{TestKit, TestProbe}
|
||||
import fr.acinq.bitcoin.{Satoshi, Script}
|
||||
import fr.acinq.eclair.blockchain.bitcoind.rpc.BitcoinJsonRPCClient
|
||||
import fr.acinq.eclair.blockchain.bitcoind.rpc.{BasicBitcoinJsonRPCClient, BitcoinJsonRPCClient}
|
||||
import fr.acinq.eclair.blockchain.{PublishAsap, WatchConfirmed, WatchEventConfirmed, WatchSpent}
|
||||
import fr.acinq.eclair.channel.{BITCOIN_FUNDING_DEPTHOK, BITCOIN_FUNDING_SPENT}
|
||||
import fr.acinq.eclair.randomKey
|
||||
@ -52,7 +52,7 @@ class BitcoinjSpec extends TestKit(ActorSystem("test")) with FunSuiteLike with B
|
||||
Files.copy(classOf[BitcoinjSpec].getResourceAsStream("/integration/bitcoin.conf"), new File(PATH_BITCOIND_DATADIR.toString, "bitcoin.conf").toPath)
|
||||
|
||||
bitcoind = s"$PATH_BITCOIND -datadir=$PATH_BITCOIND_DATADIR".run()
|
||||
bitcoinrpcclient = new BitcoinJsonRPCClient(user = "foo", password = "bar", host = "localhost", port = 28332)
|
||||
bitcoinrpcclient = new BasicBitcoinJsonRPCClient(user = "foo", password = "bar", host = "localhost", port = 28332)
|
||||
bitcoincli = system.actorOf(Props(new Actor {
|
||||
override def receive: Receive = {
|
||||
case BitcoinReq(method) => bitcoinrpcclient.invoke(method) pipeTo sender
|
||||
|
@ -8,7 +8,7 @@ import java.util.UUID
|
||||
import akka.actor.{Actor, ActorRef, ActorSystem, Props}
|
||||
import akka.pattern.pipe
|
||||
import akka.testkit.{TestKit, TestProbe}
|
||||
import fr.acinq.eclair.blockchain.bitcoind.rpc.BitcoinJsonRPCClient
|
||||
import fr.acinq.eclair.blockchain.bitcoind.rpc.{BasicBitcoinJsonRPCClient, BitcoinJsonRPCClient}
|
||||
import grizzled.slf4j.Logging
|
||||
import org.json4s.DefaultFormats
|
||||
import org.json4s.JsonAST.{JInt, JValue}
|
||||
@ -49,7 +49,7 @@ class IntegrationSpec extends TestKit(ActorSystem("test")) with FunSuiteLike wit
|
||||
Files.createDirectories(PATH_BITCOIND_DATADIR.toPath)
|
||||
Files.copy(classOf[IntegrationSpec].getResourceAsStream("/integration/bitcoin.conf"), new File(PATH_BITCOIND_DATADIR.toString, "bitcoin.conf").toPath)
|
||||
|
||||
bitcoinrpcclient = new BitcoinJsonRPCClient(user = "foo", password = "bar", host = "localhost", port = 28332)
|
||||
bitcoinrpcclient = new BasicBitcoinJsonRPCClient(user = "foo", password = "bar", host = "localhost", port = 28332)
|
||||
bitcoincli = system.actorOf(Props(new Actor {
|
||||
override def receive: Receive = {
|
||||
case BitcoinReq(method, Nil) =>
|
||||
|
@ -11,7 +11,7 @@ import com.google.common.net.HostAndPort
|
||||
import com.typesafe.config.{Config, ConfigFactory}
|
||||
import fr.acinq.bitcoin.Crypto.PublicKey
|
||||
import fr.acinq.bitcoin.{Base58, Base58Check, BinaryData, Block, Crypto, MilliSatoshi, OP_CHECKSIG, OP_DUP, OP_EQUAL, OP_EQUALVERIFY, OP_HASH160, OP_PUSHDATA, Satoshi, Script}
|
||||
import fr.acinq.eclair.blockchain.bitcoind.rpc.{BitcoinJsonRPCClient, ExtendedBitcoinClient}
|
||||
import fr.acinq.eclair.blockchain.bitcoind.rpc.{BasicBitcoinJsonRPCClient, BitcoinJsonRPCClient, ExtendedBitcoinClient}
|
||||
import fr.acinq.eclair.blockchain.bitcoinj.BitcoinjWallet
|
||||
import fr.acinq.eclair.blockchain.{Watch, WatchConfirmed}
|
||||
import fr.acinq.eclair.channel.Register.Forward
|
||||
@ -67,7 +67,7 @@ class BasicIntegrationSpvSpec extends TestKit(ActorSystem("test")) with FunSuite
|
||||
Files.copy(classOf[BasicIntegrationSpvSpec].getResourceAsStream("/integration/bitcoin.conf"), new File(PATH_BITCOIND_DATADIR.toString, "bitcoin.conf").toPath)
|
||||
|
||||
bitcoind = s"$PATH_BITCOIND -datadir=$PATH_BITCOIND_DATADIR".run()
|
||||
bitcoinrpcclient = new BitcoinJsonRPCClient(user = "foo", password = "bar", host = "localhost", port = 28332)
|
||||
bitcoinrpcclient = new BasicBitcoinJsonRPCClient(user = "foo", password = "bar", host = "localhost", port = 28332)
|
||||
bitcoincli = system.actorOf(Props(new Actor {
|
||||
override def receive: Receive = {
|
||||
case BitcoinReq(method) => bitcoinrpcclient.invoke(method) pipeTo sender
|
||||
|
@ -11,7 +11,7 @@ import com.google.common.net.HostAndPort
|
||||
import com.typesafe.config.{Config, ConfigFactory}
|
||||
import fr.acinq.bitcoin.Crypto.PublicKey
|
||||
import fr.acinq.bitcoin.{Base58, Base58Check, BinaryData, Block, Crypto, MilliSatoshi, OP_CHECKSIG, OP_DUP, OP_EQUAL, OP_EQUALVERIFY, OP_HASH160, OP_PUSHDATA, Satoshi, Script}
|
||||
import fr.acinq.eclair.blockchain.bitcoind.rpc.{BitcoinJsonRPCClient, ExtendedBitcoinClient}
|
||||
import fr.acinq.eclair.blockchain.bitcoind.rpc.{BasicBitcoinJsonRPCClient, BitcoinJsonRPCClient, ExtendedBitcoinClient}
|
||||
import fr.acinq.eclair.blockchain.{Watch, WatchConfirmed}
|
||||
import fr.acinq.eclair.channel.Register.Forward
|
||||
import fr.acinq.eclair.channel._
|
||||
@ -60,7 +60,7 @@ class IntegrationSpec extends TestKit(ActorSystem("test")) with FunSuiteLike wit
|
||||
Files.copy(classOf[IntegrationSpec].getResourceAsStream("/integration/bitcoin.conf"), new File(PATH_BITCOIND_DATADIR.toString, "bitcoin.conf").toPath)
|
||||
|
||||
bitcoind = s"$PATH_BITCOIND -datadir=$PATH_BITCOIND_DATADIR".run()
|
||||
bitcoinrpcclient = new BitcoinJsonRPCClient(user = "foo", password = "bar", host = "localhost", port = 28332)
|
||||
bitcoinrpcclient = new BasicBitcoinJsonRPCClient(user = "foo", password = "bar", host = "localhost", port = 28332)
|
||||
bitcoincli = system.actorOf(Props(new Actor {
|
||||
override def receive: Receive = {
|
||||
case BitcoinReq(method) => bitcoinrpcclient.invoke(method) pipeTo sender
|
||||
|
@ -4,7 +4,7 @@ import akka.actor.ActorSystem
|
||||
import fr.acinq.bitcoin.Crypto.PrivateKey
|
||||
import fr.acinq.bitcoin.{BinaryData, Block, Satoshi, Script, Transaction}
|
||||
import fr.acinq.eclair.blockchain.bitcoind.BitcoinCoreWallet
|
||||
import fr.acinq.eclair.blockchain.bitcoind.rpc.{BitcoinJsonRPCClient, ExtendedBitcoinClient}
|
||||
import fr.acinq.eclair.blockchain.bitcoind.rpc.{BasicBitcoinJsonRPCClient, BitcoinJsonRPCClient, ExtendedBitcoinClient}
|
||||
import fr.acinq.eclair.transactions.Scripts
|
||||
import fr.acinq.eclair.wire.{ChannelAnnouncement, ChannelUpdate}
|
||||
import fr.acinq.eclair.{randomKey, toShortId}
|
||||
@ -27,7 +27,7 @@ class AnnouncementsBatchValidationSpec extends FunSuite {
|
||||
import scala.concurrent.ExecutionContext.Implicits.global
|
||||
|
||||
implicit val system = ActorSystem()
|
||||
implicit val extendedBitcoinClient = new ExtendedBitcoinClient(new BitcoinJsonRPCClient(user = "foo", password = "bar", host = "localhost", port = 18332))
|
||||
implicit val extendedBitcoinClient = new ExtendedBitcoinClient(new BasicBitcoinJsonRPCClient(user = "foo", password = "bar", host = "localhost", port = 18332))
|
||||
|
||||
val channels = for (i <- 0 until 50) yield {
|
||||
// let's generate a block every 10 txs so that we can compute short ids
|
||||
|
Loading…
Reference in New Issue
Block a user