Implement createCETsAndCETSigsAsync() to fix performance issue in test (#3089)

* Implement createCETsAndCETSigsAsync(), use this inside of TestDLCClient.setupDLCAccept()

* Switch to lazy val to cache sig point computation

* Add FutureUtil.batchAndParallelExecute() that does not take batchSize parameter, just compute it inside as a sane deafult

* Fix 2.12.x compiler error
This commit is contained in:
Chris Stewart 2021-05-14 17:59:56 -05:00 committed by GitHub
parent 73668bb66c
commit 3205e4e275
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 48 additions and 33 deletions

View File

@ -1,7 +0,0 @@
package org.bitcoins.core.protocol.dlc.models
import org.bitcoins.crypto.ECPublicKey
case class SigPointComputer(private val computeSigPoint: () => ECPublicKey) {
lazy val compute: ECPublicKey = computeSigPoint()
}

View File

@ -1,14 +0,0 @@
package org.bitcoins.core.protocol.dlc.models
import org.bitcoins.crypto.ECPublicKey
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.concurrent.duration.DurationInt
case class SigPointComputer(private val computeSigPoint: () => ECPublicKey) {
private val sigPointF = Future(computeSigPoint())(ExecutionContext.global)
lazy val compute: ECPublicKey = {
Await.result(sigPointF, 20.seconds)
}
}

View File

@ -28,8 +28,8 @@ sealed trait OracleOutcome {
protected def computeSigPoint: ECPublicKey
/** The adaptor point used to encrypt the signatures for this corresponding CET. */
def sigPoint: ECPublicKey = {
SigPointComputer(() => computeSigPoint).compute
lazy val sigPoint: ECPublicKey = {
computeSigPoint
}
/** The sum of all oracle nonces used in execution with this OracleOutcome. */

View File

@ -194,11 +194,6 @@ case class DLCTxSigner(
def createCETSigsAsync()(implicit
ec: ExecutionContext): Future[CETSignatures] = {
val outcomes = builder.contractInfo.allOutcomes
//divide and conquer
//we want a batch size of at least 1
val size =
Math.max(outcomes.length / Runtime.getRuntime.availableProcessors(), 1)
val computeBatchFn: Vector[OracleOutcome] => Future[
Vector[(OracleOutcome, ECAdaptorSignature)]] = {
@ -210,8 +205,7 @@ case class DLCTxSigner(
val cetSigsF: Future[Vector[(OracleOutcome, ECAdaptorSignature)]] = {
FutureUtil.batchAndParallelExecute(elements = outcomes,
f = computeBatchFn,
batchSize = size)
f = computeBatchFn)
}.map(_.flatten)
for {
@ -225,10 +219,38 @@ case class DLCTxSigner(
val cetsAndSigs = buildAndSignCETs(builder.contractInfo.allOutcomes)
val (msgs, cets, sigs) = cetsAndSigs.unzip3
val refundSig = signRefundTx
(CETSignatures(msgs.zip(sigs), refundSig), cets)
}
/** The equivalent of [[createCETsAndCETSigs()]] but async */
def createCETsAndCETSigsAsync()(implicit
ec: ExecutionContext): Future[(CETSignatures, Vector[WitnessTransaction])] = {
val outcomes = builder.contractInfo.allOutcomes
val fn = { outcomes: Vector[OracleOutcome] =>
Future {
buildAndSignCETs(outcomes)
}
}
val cetsAndSigsF: Future[Vector[
Vector[(OracleOutcome, WitnessTransaction, ECAdaptorSignature)]]] = {
FutureUtil.batchAndParallelExecute[OracleOutcome,
Vector[(
OracleOutcome,
WitnessTransaction,
ECAdaptorSignature)]](elements =
outcomes,
f = fn)
}
val refundSig = signRefundTx
for {
cetsAndSigsNested <- cetsAndSigsF
cetsAndSigs = cetsAndSigsNested.flatten
(msgs, cets, sigs) = cetsAndSigs.unzip3
} yield (CETSignatures(msgs.zip(sigs), refundSig), cets)
}
/** Creates this party's CETSignatures given the outcomes and their unsigned CETs */
def createCETSigs(outcomesAndCETs: Vector[OutcomeCETPair]): CETSignatures = {
val cetSigs = signGivenCETs(outcomesAndCETs)

View File

@ -103,9 +103,24 @@ object FutureUtil {
elements: Vector[T],
f: Vector[T] => Future[U],
batchSize: Int)(implicit ec: ExecutionContext): Future[Vector[U]] = {
require(batchSize > 0, s"Cannot have batch size 0 or less, got=$batchSize")
val batches = elements.grouped(batchSize).toVector
val execute: Vector[Future[U]] = batches.map(b => f(b))
val doneF = Future.sequence(execute)
doneF
}
/** Same as [[batchAndParallelExecute()]], but computes the batchSize based on the
* number of available processors on your machine
*/
def batchAndParallelExecute[T, U](
elements: Vector[T],
f: Vector[T] => Future[U])(implicit
ec: ExecutionContext): Future[Vector[U]] = {
//divide and conquer
val batchSize =
Math.max(elements.length / Runtime.getRuntime.availableProcessors(), 1)
batchAndParallelExecute(elements, f, batchSize)
}
}

View File

@ -68,9 +68,8 @@ case class TestDLCClient(
getSigs: Future[(CETSignatures, FundingSignatures)]): Future[SetupDLC] = {
require(!isInitiator, "You should call setupDLCOffer")
val (remoteCetSigs, cets) = dlcTxSigner.createCETsAndCETSigs()
for {
(remoteCetSigs, cets) <- dlcTxSigner.createCETsAndCETSigsAsync()
_ <- sendSigs(remoteCetSigs)
(cetSigs, fundingSigs) <- getSigs
setupDLC <- Future.fromTry {