Make CRUD.run and CRUD.runVec transactional (#4059)

* Make `CRUD.run` and `CRUD.runVec` transactional

* remove explicit `transactionally` calls
This commit is contained in:
rorp 2022-02-07 15:42:48 -08:00 committed by GitHub
parent 5aeecdb893
commit 48189d5c1d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
21 changed files with 63 additions and 71 deletions

View file

@ -59,7 +59,7 @@ case class BlockHeaderDAO()(implicit
Vector[Option[BlockHeaderDb]]] = {
val query = findByPrimaryKeys(hashes)
val resultsF: Future[Vector[BlockHeaderDb]] =
safeDatabase.runVec(query.result.transactionally)
safeDatabase.runVec(query.result)
for {
results <- resultsF
} yield {

View file

@ -144,7 +144,6 @@ case class CompactFilterDAO()(implicit
.take(1)
.map(_._1)
.result
.transactionally
}
/** Gets the heaviest filter from the database */

View file

@ -137,7 +137,6 @@ case class CompactFilterHeaderDAO()(implicit
.take(1)
.map(_._1)
.result
.transactionally
}
/** Fetches the best filter header from the database _without_ context

View file

@ -75,12 +75,12 @@ abstract class CRUD[T, PrimaryKeyType](implicit
/** Update the corresponding record in the database */
def update(t: T): Future[T] = {
val action = updateAction(t).transactionally
val action = updateAction(t)
safeDatabase.run(action)
}
def updateAll(ts: Vector[T]): Future[Vector[T]] = {
val actions = updateAllAction(ts).transactionally
val actions = updateAllAction(ts)
safeDatabase.runVec(actions)
}
@ -92,18 +92,18 @@ abstract class CRUD[T, PrimaryKeyType](implicit
def delete(t: T): Future[Int] = {
logger.debug("Deleting record: " + t)
val action = deleteAction(t)
safeDatabase.run(action.transactionally)
safeDatabase.run(action)
}
def deleteAll(ts: Vector[T]): Future[Int] = {
val action = deleteAllAction(ts).transactionally
val action = deleteAllAction(ts)
safeDatabase.run(action)
}
/** delete all records from the table
*/
def deleteAll(): Future[Int] = {
val action = deleteAllAction().transactionally
val action = deleteAllAction()
safeDatabase.run(action)
}
@ -127,7 +127,7 @@ abstract class CRUD[T, PrimaryKeyType](implicit
def oldUpsertAll(ts: Vector[T]): Future[Vector[T]] = {
val actions = ts.map(t => table.insertOrUpdate(t))
for {
_ <- safeDatabase.run(DBIO.sequence(actions).transactionally)
_ <- safeDatabase.run(DBIO.sequence(actions))
result <- safeDatabase.runVec(findAll(ts).result)
} yield result
@ -150,7 +150,10 @@ case class SafeDatabase(jdbcProfile: JdbcProfileComponent[DbAppConfig])
extends Logging {
import jdbcProfile.database
import jdbcProfile.profile.api.actionBasedSQLInterpolation
import jdbcProfile.profile.api.{
actionBasedSQLInterpolation,
jdbcActionExtensionMethods
}
/** SQLite does not enable foreign keys by default. This query is
* used to enable it. It must be included in all connections to
@ -173,10 +176,13 @@ case class SafeDatabase(jdbcProfile: JdbcProfileComponent[DbAppConfig])
/** Runs the given DB action */
def run[R](action: DBIOAction[R, NoStream, _])(implicit
ec: ExecutionContext): Future[R] = {
val result =
if (sqlite) database.run[R](foreignKeysPragma >> action)
else database.run[R](action)
result.recoverWith { logAndThrowError(action) }
val result = scala.concurrent.blocking {
if (sqlite) database.run[R](foreignKeysPragma >> action.transactionally)
else database.run[R](action.transactionally)
}
result.recoverWith {
logAndThrowError(action)
}
}
/** Runs the given DB sequence-returning DB action
@ -185,10 +191,13 @@ case class SafeDatabase(jdbcProfile: JdbcProfileComponent[DbAppConfig])
def runVec[R](action: DBIOAction[Seq[R], NoStream, _])(implicit
ec: ExecutionContext): Future[Vector[R]] = {
val result = scala.concurrent.blocking {
if (sqlite) database.run[Seq[R]](foreignKeysPragma >> action)
else database.run[Seq[R]](action)
if (sqlite)
database.run[Seq[R]](foreignKeysPragma >> action.transactionally)
else database.run[Seq[R]](action.transactionally)
}
result.map(_.toVector).recoverWith {
logAndThrowError(action)
}
result.map(_.toVector).recoverWith { logAndThrowError(action) }
}
}

View file

@ -109,7 +109,7 @@ abstract class CRUDAction[T, PrimaryKeyType](implicit
Int,
NoStream,
Effect.Write with Effect.Transactional] = {
table.delete.transactionally
table.delete
}
}

View file

@ -29,7 +29,7 @@ abstract class CRUDAutoInc[T <: DbRowAutoInc[T]](implicit
override def createAll(ts: Vector[T]): Future[Vector[T]] = {
val actions = createAllAction(ts)
safeDatabase.runVec(actions.transactionally)
safeDatabase.runVec(actions)
}
override def findByPrimaryKeys(

View file

@ -28,7 +28,7 @@ trait SlickUtil[T, PrimaryKeyType] extends SlickUtilAction[T, PrimaryKeyType] {
/** Creates rows in a database that are not auto incremented */
def createAllNoAutoInc(ts: Vector[T], database: SafeDatabase)(implicit
ec: ExecutionContext): Future[Vector[T]] = {
val actions = (table ++= ts).andThen(DBIO.successful(ts)).transactionally
val actions = (table ++= ts).andThen(DBIO.successful(ts))
val result = database.run(actions)
result
}

View file

@ -64,7 +64,7 @@ case class MasterXPubDAO()(implicit
new SQLException(s"Only 1 master xpub should be stored, got=$count"))
}
database.run(action.transactionally).map(_ => t)
database.run(action).map(_ => t)
}
override def createAll(
@ -130,7 +130,7 @@ case class MasterXPubDAO()(implicit
new SQLException(s"Only 1 master xpub should be stored, got=$count"))
}
database.run(action.transactionally).map(_ => ())
database.run(action).map(_ => ())
}
class MasterXpubTable(tag: Tag)

View file

@ -271,7 +271,7 @@ case class DLCOracle()(implicit val conf: DLCOracleAppConfig)
eventDbsA = eventDAO.createAllAction(eventDbs)
eventOutcomeDbsA = eventOutcomeDAO.createAllAction(eventOutcomeDbs)
actions = DBIO.seq(rValueA, eventDbsA, eventOutcomeDbsA)
_ <- safeDatabase.run(actions.transactionally)
_ <- safeDatabase.run(actions)
} yield {
OracleEvent.fromEventDbs(eventDbs).announcementTLV
}
@ -365,7 +365,7 @@ case class DLCOracle()(implicit val conf: DLCOracleAppConfig)
nonce: SchnorrNonce,
outcome: DLCAttestationType): Future[EventDb] = {
val actionF = createAttestationActionF(nonce, outcome)
actionF.flatMap(action => safeDatabase.run(action.transactionally))
actionF.flatMap(action => safeDatabase.run(action))
}
override def signDigits(eventName: String, num: Long): Future[OracleEvent] = {
@ -439,7 +439,7 @@ case class DLCOracle()(implicit val conf: DLCOracleAppConfig)
for {
signSig <- signSigF
digitSigA <- digitSigAF
digitSigs <- safeDatabase.run(digitSigA.transactionally)
digitSigs <- safeDatabase.run(digitSigA)
} yield OracleEvent.fromEventDbs(signSig ++ digitSigs)
}

View file

@ -46,26 +46,26 @@ case class EventDAO()(implicit
def getPendingEvents: Future[Vector[EventDb]] = {
val query = table.filter(_.attestationOpt.isEmpty)
safeDatabase.runVec(query.result.transactionally)
safeDatabase.runVec(query.result)
}
def getCompletedEvents: Future[Vector[EventDb]] = {
val query = table.filter(_.attestationOpt.isDefined)
safeDatabase.runVec(query.result.transactionally)
safeDatabase.runVec(query.result)
}
def findByEventName(name: String): Future[Vector[EventDb]] = {
val query = table.filter(_.eventName === name)
safeDatabase.runVec(query.result.transactionally)
safeDatabase.runVec(query.result)
}
def findByEventDescriptor(
descriptorTLV: EventDescriptorTLV): Future[Vector[EventDb]] = {
val query = table.filter(_.eventDescriptorTLV === descriptorTLV)
safeDatabase.runVec(query.result.transactionally)
safeDatabase.runVec(query.result)
}
def findByOracleEventTLV(
@ -75,13 +75,13 @@ case class EventDAO()(implicit
table.filter(_.nonce.inSet(v0.nonces))
}
safeDatabase.runVec(query.result.transactionally)
safeDatabase.runVec(query.result)
}
def findDifferentPublicKey(key: SchnorrPublicKey): Future[Vector[EventDb]] = {
val query = table.filterNot(_.pubkey === key)
safeDatabase.runVec(query.result.transactionally)
safeDatabase.runVec(query.result)
}
class EventTable(tag: Tag) extends Table[EventDb](tag, schemaName, "events") {

View file

@ -51,7 +51,7 @@ case class EventOutcomeDAO()(implicit
def findByNonces(
nonces: Vector[SchnorrNonce]): Future[Vector[EventOutcomeDb]] = {
val action = table.filter(_.nonce.inSet(nonces)).result.transactionally
val action = table.filter(_.nonce.inSet(nonces)).result
safeDatabase.runVec(action)
}
@ -61,7 +61,7 @@ case class EventOutcomeDAO()(implicit
val query =
table.filter(item => item.nonce === nonce && item.hashedMessage === hash)
safeDatabase.run(query.result.transactionally).map(_.headOption)
safeDatabase.run(query.result).map(_.headOption)
}
class EventOutcomeTable(tag: Tag)

View file

@ -40,14 +40,14 @@ case class RValueDAO()(implicit
}
def findByNonces(nonces: Vector[SchnorrNonce]): Future[Vector[RValueDb]] = {
val action = table.filter(_.nonce.inSet(nonces)).result.transactionally
val action = table.filter(_.nonce.inSet(nonces)).result
safeDatabase.runVec(action)
}
def maxKeyIndex: Future[Option[Int]] = {
val query = table.map(_.keyIndex).max
safeDatabase.run(query.result.transactionally)
safeDatabase.run(query.result)
}
class RValueTable(tag: Tag)

View file

@ -14,13 +14,7 @@ import org.bitcoins.core.wallet.utxo.AddressTag
import org.bitcoins.crypto.{DoubleSha256DigestBE, SchnorrDigitalSignature}
import org.bitcoins.db.SafeDatabase
import org.bitcoins.dlc.wallet.DLCWallet
import org.bitcoins.dlc.wallet.models.{
AcceptDbState,
DLCCETSignaturesDb,
DLCFundingInputDb,
DLCRefundSigsDb,
OfferedDbState
}
import org.bitcoins.dlc.wallet.models._
import org.bitcoins.wallet.internal.TransactionProcessing
import scala.concurrent._
@ -30,8 +24,6 @@ import scala.concurrent._
*/
private[bitcoins] trait DLCTransactionProcessing extends TransactionProcessing {
self: DLCWallet =>
import dlcDAO.profile.api._
private lazy val safeDatabase: SafeDatabase = dlcDAO.safeDatabase
private lazy val dlcDataManagement: DLCDataManagement = DLCDataManagement(
@ -226,7 +218,7 @@ private[bitcoins] trait DLCTransactionProcessing extends TransactionProcessing {
_ <- updateAnnouncementA
} yield updatedDlcDb
}
updatedDlcDb <- safeDatabase.run(actions.transactionally)
updatedDlcDb <- safeDatabase.run(actions)
} yield {
logger.info(
s"Done calculating RemoteClaimed outcome for dlcId=${dlcId.hex}")

View file

@ -7,7 +7,6 @@ import scala.concurrent.Future
/** Helper methods for querying by dlcId whne the dlcId is the primary key on the table */
trait DLCIdDaoUtil[T, PrimaryKeyType] { _: CRUD[T, PrimaryKeyType] =>
import profile.api._
def findByDLCIdAction(dlcId: Sha256Digest): profile.api.DBIOAction[
Option[T],
@ -23,7 +22,7 @@ trait DLCIdDaoUtil[T, PrimaryKeyType] { _: CRUD[T, PrimaryKeyType] =>
profile.api.Effect.Read]
def findByDLCId(dlcId: Sha256Digest): Future[Option[T]] = {
safeDatabase.run(findByDLCIdAction(dlcId).transactionally)
safeDatabase.run(findByDLCIdAction(dlcId))
}
def deleteByDLCIdAction(dlcId: Sha256Digest): profile.api.DBIOAction[
@ -32,7 +31,7 @@ trait DLCIdDaoUtil[T, PrimaryKeyType] { _: CRUD[T, PrimaryKeyType] =>
profile.api.Effect.Write]
def deleteByDLCId(dlcId: Sha256Digest): Future[Int] = {
safeDatabase.run(deleteByDLCIdAction(dlcId).transactionally)
safeDatabase.run(deleteByDLCIdAction(dlcId))
}
}
@ -40,7 +39,6 @@ trait DLCIdDaoUtil[T, PrimaryKeyType] { _: CRUD[T, PrimaryKeyType] =>
* key on the table
*/
trait DLCIdDaoUtilNoPK[T] { _: CRUD[T, _] =>
import profile.api._
def findByDLCIdAction(dlcId: Sha256Digest): profile.api.DBIOAction[
Vector[T],
@ -48,7 +46,7 @@ trait DLCIdDaoUtilNoPK[T] { _: CRUD[T, _] =>
profile.api.Effect.Read]
def findByDLCId(dlcId: Sha256Digest): Future[Vector[T]] = {
safeDatabase.runVec(findByDLCIdAction(dlcId).transactionally)
safeDatabase.runVec(findByDLCIdAction(dlcId))
}
def deleteByDLCIdAction(dlcId: Sha256Digest): profile.api.DBIOAction[
@ -57,6 +55,6 @@ trait DLCIdDaoUtilNoPK[T] { _: CRUD[T, _] =>
profile.api.Effect.Write]
def deleteByDLCId(dlcId: Sha256Digest): Future[Int] = {
safeDatabase.run(deleteByDLCIdAction(dlcId).transactionally)
safeDatabase.run(deleteByDLCIdAction(dlcId))
}
}

View file

@ -48,7 +48,6 @@ case class DLCActionBuilder(dlcWalletDAOs: DLCWalletDAOs) {
val allActions = DBIO
.sequence(actions)
.map(_ => ())
.transactionally
allActions
}
@ -73,7 +72,6 @@ case class DLCActionBuilder(dlcWalletDAOs: DLCWalletDAOs) {
val allActions = DBIO
.sequence(actions)
.map(_ => ())
.transactionally
allActions
}
@ -105,7 +103,7 @@ case class DLCActionBuilder(dlcWalletDAOs: DLCWalletDAOs) {
_ <- deleteDlcA
} yield ()
action.transactionally
action
}
/** Retrieves a DBIOAction that fetches the global dlc db,
@ -168,6 +166,6 @@ case class DLCActionBuilder(dlcWalletDAOs: DLCWalletDAOs) {
_ <- dlcAnnouncementDAO.updateAllAction(updatedDbs)
} yield updateNonces
updateAction.transactionally
updateAction
}
}

View file

@ -31,14 +31,15 @@ import org.bitcoins.core.wallet.keymanagement.KeyManagerParams
import org.bitcoins.core.wallet.utxo.TxoState._
import org.bitcoins.core.wallet.utxo._
import org.bitcoins.crypto._
import org.bitcoins.db.models.MasterXPubDAO
import org.bitcoins.db.SafeDatabase
import org.bitcoins.db.models.MasterXPubDAO
import org.bitcoins.keymanager.bip39.BIP39KeyManager
import org.bitcoins.wallet.config.WalletAppConfig
import org.bitcoins.wallet.internal._
import org.bitcoins.wallet.models._
import scodec.bits.ByteVector
import slick.dbio.{DBIOAction, Effect, NoStream}
import java.time.Instant
import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Random, Success}
@ -1076,12 +1077,11 @@ object Wallet extends WalletLogger {
}
accounts
}
import wallet.accountDAO.profile.api._
for {
_ <- createMasterXpubF
actions = createAccountActions
accounts <- wallet.accountDAO.safeDatabase.runVec(
DBIOAction.sequence(actions).transactionally)
DBIOAction.sequence(actions))
_ = accounts.foreach { a =>
logger.info(s"Created account=${a} to DB")
}

View file

@ -72,7 +72,7 @@ case class AddressDAO()(implicit
} yield (addr, spk)
safeDatabase
.run(actions.transactionally)
.run(actions)
.map {
case (Some(addr), Some(spk)) => addr.toAddressDb(spk.scriptPubKey)
case _ =>
@ -108,7 +108,7 @@ case class AddressDAO()(implicit
} yield (addr, spk)
safeDatabase
.run(actions.transactionally)
.run(actions)
.map {
case (Some(addr), Some(spk)) => addr.toAddressDb(spk.scriptPubKey)
case _ =>
@ -122,7 +122,7 @@ case class AddressDAO()(implicit
spkTable.filter(_.scriptPubKey === addressDb.scriptPubKey).delete
val addrDelete = table.filter(_.address === addressDb.address).delete
safeDatabase
.run(DBIO.sequence(Seq(addrDelete, spkDelete)).transactionally)
.run(DBIO.sequence(Seq(addrDelete, spkDelete)))
.map(_.sum)
}
@ -289,7 +289,7 @@ case class AddressDAO()(implicit
.filter(_._2.scriptPubKey.inSet(spks))
safeDatabase
.runVec(query.result.transactionally)
.runVec(query.result)
.map(res =>
res.map { case (addrRec, spkRec) =>
addrRec.toAddressDb(spkRec.scriptPubKey)

View file

@ -106,7 +106,7 @@ case class AddressTagDAO()(implicit
def findByTagType(tagType: AddressTagType): Future[Vector[AddressTagDb]] = {
val query = table.filter(_.tagType === tagType)
safeDatabase.run(query.result.transactionally).map(_.toVector)
safeDatabase.run(query.result).map(_.toVector)
}
def dropByTagType(tagType: AddressTagType): Future[Int] = {

View file

@ -38,7 +38,7 @@ case class ScriptPubKeyDAO()(implicit
}
} yield spk
safeDatabase.run(actions.transactionally)
safeDatabase.run(actions)
}
/** Finds a scriptPubKey in the database, if it exists */

View file

@ -78,7 +78,7 @@ case class SpendingInfoDAO()(implicit
} yield (utxo, spk)
safeDatabase
.run(actions.transactionally)
.run(actions)
.map {
case (utxo, Some(spk)) => utxo.toSpendingInfoDb(spk.scriptPubKey)
case _ =>
@ -128,7 +128,7 @@ case class SpendingInfoDAO()(implicit
.headOption
} yield (utxo, spk)
safeDatabase
.run(actions.transactionally)
.run(actions)
.map {
case (Some(utxo), Some(spk)) => utxo.toSpendingInfoDb(spk.scriptPubKey)
case _ =>
@ -164,7 +164,7 @@ case class SpendingInfoDAO()(implicit
.headOption
} yield (utxo, spk)
safeDatabase
.run(actions.transactionally)
.run(actions)
.map {
case (Some(utxo), Some(spk)) => utxo.toSpendingInfoDb(spk.scriptPubKey)
case _ =>
@ -473,9 +473,6 @@ case class SpendingInfoDAO()(implicit
DBIO.successful(count)
}
}
//this needs to be at the end, to make sure we rollback correctly if
//the utxo is already reserved
.transactionally
safeDatabase
.run(action)

View file

@ -67,7 +67,7 @@ trait TxDAO[DbEntryType <: TxDB]
txIdBEs: Vector[DoubleSha256DigestBE]): Future[Vector[DbEntryType]] = {
val q = table.filter(_.txIdBE.inSet(txIdBEs))
safeDatabase.runVec(q.result.transactionally)
safeDatabase.runVec(q.result)
}
def findByTxId(txIdBE: DoubleSha256DigestBE): Future[Option[DbEntryType]] = {