Add an ability to one Postgres database for all sub-projects (#1897)

* Add an ability to one Postgres database for all sub-projects
This commit is contained in:
rorp 2020-08-26 12:20:18 -07:00 committed by GitHub
parent 581851f22c
commit 23685f124e
24 changed files with 102 additions and 86 deletions

View file

@ -34,7 +34,7 @@ class BlockchainTest extends ChainUnitTest {
case ConnectTipResult.ExtendChain(_, newChain) =>
assert(newHeader == newChain.tip)
case fail @ (_: ConnectTipResult.Reorg | _: ConnectTipResult.BadTip) =>
case _ @(_: ConnectTipResult.Reorg | _: ConnectTipResult.BadTip) =>
assert(false)
}
}

View file

@ -34,7 +34,7 @@ class ChainSyncTest extends ChainDbUnitTest {
//let's generate a block on bitcoind
val block1F =
bitcoind.getNewAddress.flatMap(bitcoind.generateToAddress(1, _))
val newChainHandlerF: Future[ChainApi] = block1F.flatMap { hashes =>
val newChainHandlerF: Future[ChainApi] = block1F.flatMap { _ =>
ChainSync.sync(chainHandler = chainHandler,
getBlockHeaderFunc = getBlockHeaderFunc,
getBestBlockHashFunc = getBestBlockHashFunc)

View file

@ -244,7 +244,7 @@ class BlockHeaderDAOTest extends ChainDbUnitTest {
bh.height == 1
}
val foundF = createdF.flatMap(created => blockHeaderDAO.find(f))
val foundF = createdF.flatMap(_ => blockHeaderDAO.find(f))
for {
created <- createdF

View file

@ -41,7 +41,7 @@ class TipValidationTest extends ChainDbUnitTest {
}
it must "fail to connect two blocks that do not reference prev block hash correctly" in {
bhDAO =>
_ =>
val badPrevHash = BlockHeaderHelper.badPrevHash
val expected = TipUpdateResult.BadPreviousBlockHash(badPrevHash)
@ -50,13 +50,13 @@ class TipValidationTest extends ChainDbUnitTest {
}
it must "fail to connect two blocks with two different POW requirements at the wrong interval" in {
bhDAO =>
_ =>
val badPOW = BlockHeaderHelper.badNBits
val expected = TipUpdateResult.BadPOW(badPOW)
runTest(badPOW, expected, blockchain)
}
it must "fail to connect two blocks with a bad nonce" in { bhDAO =>
it must "fail to connect two blocks with a bad nonce" in { _ =>
val badNonce = BlockHeaderHelper.badNonce
val expected = TipUpdateResult.BadNonce(badNonce)
runTest(badNonce, expected, blockchain)

View file

@ -8,7 +8,7 @@ import org.bitcoins.chain.models.{
}
import org.bitcoins.db.{DbManagement, JdbcProfileComponent}
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.ExecutionContext
/**
* Responsible for creating and destroying database
@ -33,9 +33,4 @@ trait ChainDbManagement extends DbManagement {
override lazy val allTables: List[TableQuery[Table[_]]] =
List(chainTable, filterHeaderTable, filterTable)
def createHeaderTable(createIfNotExists: Boolean = true): Future[Unit] = {
createTable(chainTable, createIfNotExists)(ec)
}
}

View file

@ -416,7 +416,7 @@ case class BlockHeaderDAO()(implicit
/** A table that stores block headers related to a blockchain */
class BlockHeaderTable(tag: Tag)
extends Table[BlockHeaderDb](tag, "block_headers") {
extends Table[BlockHeaderDb](tag, schemaName, "block_headers") {
def height = column[Int]("height")

View file

@ -30,7 +30,7 @@ case class CompactFilterDAO()(implicit
}
class CompactFilterTable(tag: Tag)
extends Table[CompactFilterDb](tag, "cfilters") {
extends Table[CompactFilterDb](tag, schemaName, "cfilters") {
def hash = column[DoubleSha256DigestBE]("hash")

View file

@ -24,7 +24,7 @@ case class CompactFilterHeaderDAO()(implicit
}
class CompactFilterHeaderTable(tag: Tag)
extends Table[CompactFilterHeaderDb](tag, "cfheaders") {
extends Table[CompactFilterHeaderDb](tag, schemaName, "cfheaders") {
def hash = column[DoubleSha256DigestBE]("hash", O.PrimaryKey)

View file

@ -47,8 +47,11 @@ abstract class CRUDAutoInc[T <: DbRowAutoInc[T]](implicit
trait TableAutoIncComponent[T <: DbRowAutoInc[T]] { self: CRUDAutoInc[T] =>
import profile.api._
abstract class TableAutoInc[T](tag: profile.api.Tag, tableName: String)
extends profile.api.Table[T](tag, tableName) {
abstract class TableAutoInc[T](
tag: profile.api.Tag,
schemaName: Option[String],
tableName: String)
extends profile.api.Table[T](tag, schemaName, tableName) {
def id: Rep[Long] = column[Long]("id", O.PrimaryKey, O.AutoInc)
}
}

View file

@ -72,29 +72,54 @@ trait DbManagement extends BitcoinSLogger {
def dropTable(
table: TableQuery[Table[_]]
): Future[Unit] = {
val result = database.run(table.schema.dropIfExists)
val query = table.schema.dropIfExists
val result = database.run(query)
result
}
def dropTable(tableName: String): Future[Int] = {
val result = database.run(sqlu"""DROP TABLE IF EXISTS #$tableName""")
import scala.concurrent.ExecutionContext.Implicits.global
def dropTable(tableName: String)(implicit
ec: ExecutionContext): Future[Int] = {
val fullTableName = schemaName.map(_ + ".").getOrElse("") + tableName
val sql = sqlu"""DROP TABLE IF EXISTS #$fullTableName"""
val result = database.run(sql)
result.failed.foreach { ex =>
ex.printStackTrace()
}
result
}
def createSchema(createIfNotExists: Boolean = true)(implicit
ec: ExecutionContext): Future[Unit] =
schemaName match {
case None =>
FutureUtil.unit
case Some(schema) =>
val sql =
if (createIfNotExists)
sqlu"""CREATE SCHEMA IF NOT EXISTS #$schema"""
else
sqlu"""CREATE SCHEMA #$schema"""
database.run(sql).map(_ => ())
}
/** Executes migrations related to this database
*
* @see [[https://flywaydb.org/documentation/api/#programmatic-configuration-java]]
*/
def migrate(): Int = {
val module = appConfig.moduleName
val config =
Flyway
val config = {
val conf = Flyway
.configure()
.locations(s"classpath:${driverName}/${module}/migration/")
if (isPostgres) {
conf
.schemas(module)
.defaultSchema(module)
} else {
conf
}
}
val flyway = config.dataSource(jdbcUrl, username, password).load
try {
@ -110,4 +135,7 @@ trait DbManagement extends BitcoinSLogger {
flyway.migrate()
}
}
private def isPostgres =
appConfig.slickDbConfig.profile.getClass.getName == "slick.jdbc.PostgresProfile$"
}

View file

@ -38,6 +38,12 @@ trait JdbcProfileComponent[+ConfigType <: AppConfig] extends BitcoinSLogger {
parts(1)
}
lazy val schemaName: Option[String] =
if (driverName == "postgresql")
Some(appConfig.moduleName)
else
None
lazy val username: String = dbConfig.config.getString("db.username")
lazy val password: String = dbConfig.config.getString("db.password")

View file

@ -230,15 +230,27 @@ bitcoin-s {
common {
profile = "slick.jdbc.PostgresProfile$"
db {
driver = org.postgresql.Driver
url = "jdbc:postgresql://localhost:5432/database"
driver = "org.postgresql.Driver"
username = "user"
password = "topsecret"
numThreads = 5
}
}
chain.profile = ${bitcoin-s.common.profile}
chain.db = ${bitcoin-s.common.db}
node.profile = ${bitcoin-s.common.profile}
node.db = ${bitcoin-s.common.db}
wallet.profile = ${bitcoin-s.common.profile}
wallet.db = ${bitcoin-s.common.db}
}
```
The database driver will create a separate SQL namespace for each sub-project: `chain`, `node` and `wallet`.
Also you can use mix databases and drivers in one configuration. For example,
This configuration file enables Sqlite for `node` project (it's default, so its configuration
is omitted), and `walletdb` and `chaindb` PostgreSQL databases for `wallet` and `chain` projects:
@ -248,8 +260,8 @@ bitcoin-s {
chain {
profile = "slick.jdbc.PostgresProfile$"
db {
driver = org.postgresql.Driver
url = "jdbc:postgresql://localhost:5432/chaindb"
driver = "org.postgresql.Driver"
username = "user"
password = "topsecret"
}
@ -257,8 +269,8 @@ bitcoin-s {
wallet {
profile = "slick.jdbc.PostgresProfile$"
db {
driver = org.postgresql.Driver
url = "jdbc:postgresql://localhost:5432/walletdb"
driver = "org.postgresql.Driver"
username = "user"
password = "topsecret"
}

View file

@ -55,7 +55,9 @@ final case class BroadcastAbleTransactionDAO()(implicit
/** Table over TXs we can broadcast over the P2P network */
class BroadcastAbleTransactionTable(tag: Tag)
extends Table[BroadcastAbleTransaction](tag, "broadcast_elements") {
extends Table[BroadcastAbleTransaction](tag,
schemaName,
"broadcast_elements") {
private type Tuple = (DoubleSha256DigestBE, ByteVector)
private val fromTuple: Tuple => BroadcastAbleTransaction = {

View file

@ -34,7 +34,7 @@ object BitcoinSTestAppConfig {
}
def getSpvWithEmbeddedDbTestConfig(
pgUrl: ProjectType => Option[String],
pgUrl: () => Option[String],
config: Config*)(implicit ec: ExecutionContext): BitcoinSAppConfig = {
val overrideConf = ConfigFactory.parseString {
"""
@ -67,7 +67,7 @@ object BitcoinSTestAppConfig {
}
def getNeutrinoWithEmbeddedDbTestConfig(
pgUrl: ProjectType => Option[String],
pgUrl: () => Option[String],
config: Config*)(implicit ec: ExecutionContext): BitcoinSAppConfig = {
val overrideConf = ConfigFactory.parseString {
"""
@ -102,13 +102,13 @@ object BitcoinSTestAppConfig {
*/
def configWithEmbeddedDb(
project: Option[ProjectType],
pgUrl: ProjectType => Option[String]): Config = {
pgUrl: () => Option[String]): Config = {
def pgConfigForProject(project: ProjectType): String = {
val name = project.toString().toLowerCase()
s""" $name.profile = "slick.jdbc.PostgresProfile$$"
| $name.db {
| url = "${pgUrl(project).getOrElse(
| url = "${pgUrl().getOrElse(
throw new RuntimeException(s"Cannot get db url for $project"))}"
| driver = "org.postgresql.Driver"
| username = "postgres"
@ -119,7 +119,7 @@ object BitcoinSTestAppConfig {
}
def configForProject(project: ProjectType) =
if (pgUrl(project).isDefined)
if (pgUrl().isDefined)
pgConfigForProject(project)
else
""

View file

@ -1,7 +1,6 @@
package org.bitcoins.testkit
import com.opentable.db.postgres.embedded.EmbeddedPostgres
import org.bitcoins.testkit.BitcoinSTestAppConfig.ProjectType
import org.scalatest.{BeforeAndAfterAll, Suite}
import scala.util.Try
@ -13,31 +12,11 @@ trait EmbeddedPg extends BeforeAndAfterAll { this: Suite =>
lazy val pg: Option[EmbeddedPostgres] =
if (pgEnabled) Some(EmbeddedPostgres.start()) else None
def pgUrl(dbname: String): Option[String] =
pg.map(_.getJdbcUrl("postgres", dbname))
def pgUrl(project: ProjectType): Option[String] =
project match {
case ProjectType.Wallet => pgUrl("walletdb")
case ProjectType.Node => pgUrl("nodedb")
case ProjectType.Chain => pgUrl("chaindb")
case ProjectType.Test => pgUrl("testdb")
}
override def beforeAll(): Unit = {
super.beforeAll()
executePgSql(s"CREATE DATABASE chaindb")
executePgSql(s"CREATE DATABASE walletdb")
executePgSql(s"CREATE DATABASE nodedb")
executePgSql(s"CREATE DATABASE testdb")
}
def pgUrl(): Option[String] =
pg.map(_.getJdbcUrl(userName = "postgres", dbName = "postgres"))
override def afterAll(): Unit = {
super.afterAll()
Try(executePgSql(s"DROP DATABASE nodedb"))
Try(executePgSql(s"DROP DATABASE walletdb"))
Try(executePgSql(s"DROP DATABASE chaindb"))
Try(executePgSql(s"DROP DATABASE testdb"))
Try(pg.foreach(_.close()))
()
}

View file

@ -10,13 +10,7 @@ import org.bitcoins.chain.blockchain.sync.ChainSync
import org.bitcoins.chain.config.ChainAppConfig
import org.bitcoins.chain.models._
import org.bitcoins.chain.pow.Pow
import org.bitcoins.core.api.chain.db.{
BlockHeaderDb,
BlockHeaderDbHelper,
ChainApi,
CompactFilterDb,
CompactFilterHeaderDb
}
import org.bitcoins.core.api.chain.db._
import org.bitcoins.core.protocol.blockchain.{Block, BlockHeader}
import org.bitcoins.crypto.DoubleSha256DigestBE
import org.bitcoins.db.AppConfig
@ -359,8 +353,6 @@ object ChainUnitTest extends ChainVerificationLogger {
// The height of the first block in the json file
val OFFSET: Int = FIRST_BLOCK_HEIGHT
val tableSetupF = ChainUnitTest.setupHeaderTable()
val source =
scala.io.Source.fromURL(getClass.getResource("/block_headers.json"))
val arrStr = source.getLines.next
@ -412,21 +404,20 @@ object ChainUnitTest extends ChainVerificationLogger {
dbHeaders = dbHeaders,
batchesSoFar = Vector.empty)
val chainHandlerF = ChainUnitTest.makeChainHandler()
val insertedF = tableSetupF.flatMap { _ =>
batchedDbHeaders.foldLeft(
for {
_ <- ChainUnitTest.setupAllTables()
chainHandler <- ChainUnitTest.makeChainHandler()
_ <- batchedDbHeaders.foldLeft(
Future.successful[Vector[BlockHeaderDb]](Vector.empty)) {
case (fut, batch) =>
for {
_ <- fut
chainHandler <- chainHandlerF
headers <- chainHandler.blockHeaderDAO.createAll(batch)
} yield headers
}
} yield {
chainHandler.blockHeaderDAO
}
insertedF.flatMap(_ => chainHandlerF.map(_.blockHeaderDAO))
}
}
@ -483,12 +474,6 @@ object ChainUnitTest extends ChainVerificationLogger {
BitcoindRpcTestUtil.stopServer(bitcoind)
}
/** Creates the [[org.bitcoins.chain.models.BlockHeaderTable]] */
private def setupHeaderTable()(implicit
appConfig: ChainAppConfig): Future[Unit] = {
appConfig.createHeaderTable(createIfNotExists = true)
}
def setupAllTables()(implicit
appConfig: ChainAppConfig,
ec: ExecutionContext): Future[Unit] =

View file

@ -59,7 +59,7 @@ case class AccountDAO()(implicit
}
class AccountTable(tag: Tag)
extends Table[AccountDb](tag, "wallet_accounts") {
extends Table[AccountDb](tag, schemaName, "wallet_accounts") {
def xpub: Rep[ExtPublicKey] = column[ExtPublicKey]("xpub")

View file

@ -309,7 +309,8 @@ case class AddressDAO()(implicit
* todo: this needs design rework.
* todo: https://github.com/bitcoin-s/bitcoin-s-core/pull/391#discussion_r274188334
*/
class AddressTable(tag: Tag) extends Table[AddressRecord](tag, "addresses") {
class AddressTable(tag: Tag)
extends Table[AddressRecord](tag, schemaName, "addresses") {
def purpose: Rep[HDPurpose] = column("hd_purpose")

View file

@ -166,7 +166,7 @@ case class AddressTagDAO()(implicit
}
class AddressTagTable(t: Tag)
extends Table[AddressTagDb](t, "wallet_address_tags") {
extends Table[AddressTagDb](t, schemaName, "wallet_address_tags") {
def address: Rep[BitcoinAddress] = column[BitcoinAddress]("address")

View file

@ -24,7 +24,9 @@ case class IncomingTransactionDAO()(implicit
}
class IncomingTransactionTable(tag: Tag)
extends TxTable[IncomingTransactionDb](tag, "wallet_incoming_txs") {
extends TxTable[IncomingTransactionDb](tag,
schemaName,
"wallet_incoming_txs") {
private val mappers = new org.bitcoins.db.DbCommonsColumnMappers(profile)
import mappers._

View file

@ -25,7 +25,9 @@ case class OutgoingTransactionDAO()(implicit
}
class OutgoingTransactionTable(tag: Tag)
extends TxTable[OutgoingTransactionDb](tag, "wallet_outgoing_txs") {
extends TxTable[OutgoingTransactionDb](tag,
schemaName,
"wallet_outgoing_txs") {
private val mappers = new org.bitcoins.db.DbCommonsColumnMappers(profile)
import mappers._

View file

@ -23,7 +23,7 @@ case class ScriptPubKeyDAO()(implicit
TableQuery[ScriptPubKeyTable]
case class ScriptPubKeyTable(tag: Tag)
extends TableAutoInc[ScriptPubKeyDb](tag, "pub_key_scripts") {
extends TableAutoInc[ScriptPubKeyDb](tag, schemaName, "pub_key_scripts") {
def scriptPubKey: Rep[ScriptPubKey] = column("script_pub_key")
def scriptType: Rep[ScriptType] = column("script_type")

View file

@ -467,7 +467,7 @@ case class SpendingInfoDAO()(implicit
* TXID of the transaction that created this output.
*/
case class SpendingInfoTable(tag: Tag)
extends TableAutoInc[UTXORecord](tag, "txo_spending_info") {
extends TableAutoInc[UTXORecord](tag, schemaName, "txo_spending_info") {
def outPoint: Rep[TransactionOutPoint] =
column("tx_outpoint", O.Unique)

View file

@ -17,8 +17,9 @@ trait TxCRUDComponent[DbEntryType <: TxDB] {
abstract class TxTable[DbEntryType <: TxDB](
tag: profile.api.Tag,
schemaName: Option[String],
tableName: String)
extends Table[DbEntryType](tag, tableName) {
extends Table[DbEntryType](tag, schemaName, tableName) {
def txIdBE: Rep[DoubleSha256DigestBE]
}
}
@ -89,7 +90,7 @@ case class TransactionDAO()(implicit
override val table = TableQuery[TransactionTable]
class TransactionTable(tag: Tag)
extends TxTable[TransactionDb](tag, "tx_table") {
extends TxTable[TransactionDb](tag, schemaName, "tx_table") {
def txIdBE: Rep[DoubleSha256DigestBE] = column("txIdBE", O.Unique)