From 67419a7c0c6410761ee369c1d24aba8641b8e400 Mon Sep 17 00:00:00 2001 From: Elle Mouton Date: Mon, 28 Oct 2024 09:35:18 +0200 Subject: [PATCH] Revert "kvdb/postgres: remove global application level lock" This reverts commit 43a1ca4f3d70876ae7cad57fdb7a09704ae21a3e. --- kvdb/postgres/db.go | 1 + kvdb/sqlbase/db.go | 8 +++++++ kvdb/sqlbase/readwrite_tx.go | 44 ++++++++++++++++++++++++++++++++++++ 3 files changed, 53 insertions(+) diff --git a/kvdb/postgres/db.go b/kvdb/postgres/db.go index 425ba1622..90ca8324a 100644 --- a/kvdb/postgres/db.go +++ b/kvdb/postgres/db.go @@ -28,6 +28,7 @@ func newPostgresBackend(ctx context.Context, config *Config, prefix string) ( Schema: "public", TableNamePrefix: prefix, SQLiteCmdReplacements: sqliteCmdReplacements, + WithTxLevelLock: true, } return sqlbase.NewSqlBackend(ctx, cfg) diff --git a/kvdb/sqlbase/db.go b/kvdb/sqlbase/db.go index 6ef085712..221a77bfd 100644 --- a/kvdb/sqlbase/db.go +++ b/kvdb/sqlbase/db.go @@ -55,6 +55,10 @@ type Config struct { // commands. Note that the sqlite keywords to be replaced are // case-sensitive. SQLiteCmdReplacements SQLiteCmdReplacements + + // WithTxLevelLock when set will ensure that there is a transaction + // level lock. + WithTxLevelLock bool } // db holds a reference to the sql db connection. @@ -75,6 +79,10 @@ type db struct { // db is the underlying database connection instance. db *sql.DB + // lock is the global write lock that ensures single writer. This is + // only used if cfg.WithTxLevelLock is set. + lock sync.RWMutex + // table is the name of the table that contains the data for all // top-level buckets that have keys that cannot be mapped to a distinct // sql table. diff --git a/kvdb/sqlbase/readwrite_tx.go b/kvdb/sqlbase/readwrite_tx.go index 18a6a682c..ec761931a 100644 --- a/kvdb/sqlbase/readwrite_tx.go +++ b/kvdb/sqlbase/readwrite_tx.go @@ -5,6 +5,7 @@ package sqlbase import ( "context" "database/sql" + "sync" "github.com/btcsuite/btcwallet/walletdb" ) @@ -19,11 +20,28 @@ type readWriteTx struct { // active is true if the transaction hasn't been committed yet. active bool + + // locker is a pointer to the global db lock. + locker sync.Locker } // newReadWriteTx creates an rw transaction using a connection from the // specified pool. func newReadWriteTx(db *db, readOnly bool) (*readWriteTx, error) { + locker := newNoopLocker() + if db.cfg.WithTxLevelLock { + // Obtain the global lock instance. An alternative here is to + // obtain a database lock from Postgres. Unfortunately there is + // no database-level lock in Postgres, meaning that each table + // would need to be locked individually. Perhaps an advisory + // lock could perform this function too. + locker = &db.lock + if readOnly { + locker = db.lock.RLocker() + } + } + locker.Lock() + // Start the transaction. Don't use the timeout context because it would // be applied to the transaction as a whole. If possible, mark the // transaction as read-only to make sure that potential programming @@ -36,6 +54,7 @@ func newReadWriteTx(db *db, readOnly bool) (*readWriteTx, error) { }, ) if err != nil { + locker.Unlock() return nil, err } @@ -43,6 +62,7 @@ func newReadWriteTx(db *db, readOnly bool) (*readWriteTx, error) { db: db, tx: tx, active: true, + locker: locker, }, nil } @@ -74,6 +94,7 @@ func (tx *readWriteTx) Rollback() error { // Unlock the transaction regardless of the error result. tx.active = false + tx.locker.Unlock() return err } @@ -141,6 +162,7 @@ func (tx *readWriteTx) Commit() error { // Unlock the transaction regardless of the error result. tx.active = false + tx.locker.Unlock() return err } @@ -182,3 +204,25 @@ func (tx *readWriteTx) Exec(query string, args ...interface{}) (sql.Result, return tx.tx.ExecContext(ctx, query, args...) } + +// noopLocker is an implementation of a no-op sync.Locker. +type noopLocker struct{} + +// newNoopLocker creates a new noopLocker. +func newNoopLocker() sync.Locker { + return &noopLocker{} +} + +// Lock is a noop. +// +// NOTE: this is part of the sync.Locker interface. +func (n *noopLocker) Lock() { +} + +// Unlock is a noop. +// +// NOTE: this is part of the sync.Locker interface. +func (n *noopLocker) Unlock() { +} + +var _ sync.Locker = (*noopLocker)(nil)