mirror of
https://github.com/ACINQ/eclair.git
synced 2024-11-19 18:10:42 +01:00
Use less strict isolation level for channel meta (#1790)
We don't need `SERIALIZABLE` consistency guarantees when all we do is updating timestamp columns. This happens concurrently to channel data update and raised serialization errors in postgres. Fixed #1786. Co-authored-by: Bastien Teinturier <31281497+t-bast@users.noreply.github.com>
This commit is contained in:
parent
3079cb4fc6
commit
a8d4e07bdd
@ -16,6 +16,7 @@
|
||||
|
||||
package fr.acinq.eclair.db.pg
|
||||
|
||||
import com.zaxxer.hikari.util.IsolationLevel
|
||||
import fr.acinq.bitcoin.ByteVector32
|
||||
import fr.acinq.eclair.CltvExpiry
|
||||
import fr.acinq.eclair.channel.HasCommitments
|
||||
@ -102,7 +103,7 @@ class PgChannelsDb(implicit ds: DataSource, lock: PgLock) extends ChannelsDb wit
|
||||
* Helper method to factor updating timestamp columns
|
||||
*/
|
||||
private def updateChannelMetaTimestampColumn(channelId: ByteVector32, columnName: String): Unit = {
|
||||
inTransaction { pg =>
|
||||
inTransaction(IsolationLevel.TRANSACTION_READ_UNCOMMITTED) { pg =>
|
||||
using(pg.prepareStatement(s"UPDATE local_channels SET $columnName=? WHERE channel_id=?")) { statement =>
|
||||
statement.setTimestamp(1, Timestamp.from(Instant.now()))
|
||||
statement.setString(2, channelId.toHex)
|
||||
|
@ -16,6 +16,7 @@
|
||||
|
||||
package fr.acinq.eclair.db.pg
|
||||
|
||||
import com.zaxxer.hikari.util.IsolationLevel
|
||||
import fr.acinq.eclair.db.Monitoring.Metrics._
|
||||
import fr.acinq.eclair.db.Monitoring.Tags
|
||||
import fr.acinq.eclair.db.jdbc.JdbcUtils
|
||||
@ -23,7 +24,7 @@ import fr.acinq.eclair.db.pg.PgUtils.PgLock.LockFailureHandler.LockException
|
||||
import grizzled.slf4j.Logging
|
||||
import org.postgresql.util.{PGInterval, PSQLException}
|
||||
|
||||
import java.sql.{Connection, Statement, Timestamp}
|
||||
import java.sql.{Connection, Timestamp}
|
||||
import java.util.UUID
|
||||
import javax.sql.DataSource
|
||||
import scala.concurrent.duration._
|
||||
@ -242,11 +243,14 @@ object PgUtils extends JdbcUtils {
|
||||
|
||||
}
|
||||
|
||||
def inTransaction[T](connection: Connection)(f: Connection => T): T = {
|
||||
val autoCommit = connection.getAutoCommit
|
||||
/**
|
||||
* @param isolationLevel Be careful when changing the default value
|
||||
*/
|
||||
private def inTransactionInternal[T](isolationLevel: IsolationLevel)(connection: Connection)(f: Connection => T): T = {
|
||||
val previousAutoCommit = connection.getAutoCommit
|
||||
connection.setAutoCommit(false)
|
||||
val isolationLevel = connection.getTransactionIsolation
|
||||
connection.setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE)
|
||||
val previousIsolationLevel = connection.getTransactionIsolation
|
||||
connection.setTransactionIsolation(isolationLevel.getLevelId)
|
||||
try {
|
||||
val res = f(connection)
|
||||
connection.commit()
|
||||
@ -256,14 +260,20 @@ object PgUtils extends JdbcUtils {
|
||||
connection.rollback()
|
||||
throw ex
|
||||
} finally {
|
||||
connection.setAutoCommit(autoCommit)
|
||||
connection.setTransactionIsolation(isolationLevel)
|
||||
connection.setAutoCommit(previousAutoCommit)
|
||||
connection.setTransactionIsolation(previousIsolationLevel)
|
||||
}
|
||||
}
|
||||
|
||||
def inTransaction[T](f: Connection => T)(implicit dataSource: DataSource): T = {
|
||||
withConnection { connection =>
|
||||
inTransaction(connection)(f)
|
||||
inTransactionInternal(IsolationLevel.TRANSACTION_SERIALIZABLE)(connection)(f)
|
||||
}
|
||||
}
|
||||
|
||||
def inTransaction[T](isolationLevel: IsolationLevel)(f: Connection => T)(implicit dataSource: DataSource): T = {
|
||||
withConnection { connection =>
|
||||
inTransactionInternal(isolationLevel)(connection)(f)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -98,8 +98,11 @@ class ChannelsDbSpec extends AnyFunSuite {
|
||||
val db = dbs.channels
|
||||
implicit val ec: ExecutionContextExecutor = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(8))
|
||||
val channel = ChannelCodecsSpec.normal
|
||||
val futures = for (_ <- 0 until 10000) yield {
|
||||
Future(db.addOrUpdateChannel(channel.modify(_.commitments.channelId).setTo(randomBytes32)))
|
||||
val channelIds = (0 until 10).map(_ => randomBytes32).toList
|
||||
val futures = for (i <- 0 until 10000) yield {
|
||||
val channelId = channelIds(i % channelIds.size)
|
||||
Future(db.addOrUpdateChannel(channel.modify(_.commitments.channelId).setTo(channelId)))
|
||||
Future(db.updateChannelMeta(channelId, ChannelEvent.EventType.PaymentSent))
|
||||
}
|
||||
val res = Future.sequence(futures)
|
||||
Await.result(res, 60 seconds)
|
||||
@ -375,4 +378,4 @@ object ChannelsDbSpec {
|
||||
rs.getTimestampNullable(columnName).map(_.getTime)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user