mirror of
https://github.com/lightningnetwork/lnd.git
synced 2024-11-19 01:43:16 +01:00
kvdb/postgres: remove global application level lock
In this commit, we remove the global application level lock from the postgres backend. This lock prevents multiple write transactions from happening at the same time, and will also block a writer if a read is on going. Since this lock was added, we know always open DB connections with the strongest level of concurrency control available: `LevelSerializable`. In concert with the new auto retry logic, we ensure that if db transactions conflict (writing the same key/row in this case), then the tx is retried automatically. Removing this lock should increase perf for the postgres backend, as now concurrent write transactions can proceed, being serialized as needed. Rather then trying to handle concurrency at the application level, we'll set postgres do its job, with the application only needing to retry as necessary.
This commit is contained in:
parent
1acc8393bc
commit
43a1ca4f3d
@ -28,7 +28,6 @@ func newPostgresBackend(ctx context.Context, config *Config, prefix string) (
|
|||||||
Schema: "public",
|
Schema: "public",
|
||||||
TableNamePrefix: prefix,
|
TableNamePrefix: prefix,
|
||||||
SQLiteCmdReplacements: sqliteCmdReplacements,
|
SQLiteCmdReplacements: sqliteCmdReplacements,
|
||||||
WithTxLevelLock: true,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return sqlbase.NewSqlBackend(ctx, cfg)
|
return sqlbase.NewSqlBackend(ctx, cfg)
|
||||||
|
@ -55,10 +55,6 @@ type Config struct {
|
|||||||
// commands. Note that the sqlite keywords to be replaced are
|
// commands. Note that the sqlite keywords to be replaced are
|
||||||
// case-sensitive.
|
// case-sensitive.
|
||||||
SQLiteCmdReplacements SQLiteCmdReplacements
|
SQLiteCmdReplacements SQLiteCmdReplacements
|
||||||
|
|
||||||
// WithTxLevelLock when set will ensure that there is a transaction
|
|
||||||
// level lock.
|
|
||||||
WithTxLevelLock bool
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// db holds a reference to the sql db connection.
|
// db holds a reference to the sql db connection.
|
||||||
@ -79,10 +75,6 @@ type db struct {
|
|||||||
// db is the underlying database connection instance.
|
// db is the underlying database connection instance.
|
||||||
db *sql.DB
|
db *sql.DB
|
||||||
|
|
||||||
// lock is the global write lock that ensures single writer. This is
|
|
||||||
// only used if cfg.WithTxLevelLock is set.
|
|
||||||
lock sync.RWMutex
|
|
||||||
|
|
||||||
// table is the name of the table that contains the data for all
|
// table is the name of the table that contains the data for all
|
||||||
// top-level buckets that have keys that cannot be mapped to a distinct
|
// top-level buckets that have keys that cannot be mapped to a distinct
|
||||||
// sql table.
|
// sql table.
|
||||||
|
@ -5,7 +5,6 @@ package sqlbase
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"database/sql"
|
"database/sql"
|
||||||
"sync"
|
|
||||||
|
|
||||||
"github.com/btcsuite/btcwallet/walletdb"
|
"github.com/btcsuite/btcwallet/walletdb"
|
||||||
)
|
)
|
||||||
@ -20,28 +19,11 @@ type readWriteTx struct {
|
|||||||
|
|
||||||
// active is true if the transaction hasn't been committed yet.
|
// active is true if the transaction hasn't been committed yet.
|
||||||
active bool
|
active bool
|
||||||
|
|
||||||
// locker is a pointer to the global db lock.
|
|
||||||
locker sync.Locker
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// newReadWriteTx creates an rw transaction using a connection from the
|
// newReadWriteTx creates an rw transaction using a connection from the
|
||||||
// specified pool.
|
// specified pool.
|
||||||
func newReadWriteTx(db *db, readOnly bool) (*readWriteTx, error) {
|
func newReadWriteTx(db *db, readOnly bool) (*readWriteTx, error) {
|
||||||
locker := newNoopLocker()
|
|
||||||
if db.cfg.WithTxLevelLock {
|
|
||||||
// Obtain the global lock instance. An alternative here is to
|
|
||||||
// obtain a database lock from Postgres. Unfortunately there is
|
|
||||||
// no database-level lock in Postgres, meaning that each table
|
|
||||||
// would need to be locked individually. Perhaps an advisory
|
|
||||||
// lock could perform this function too.
|
|
||||||
locker = &db.lock
|
|
||||||
if readOnly {
|
|
||||||
locker = db.lock.RLocker()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
locker.Lock()
|
|
||||||
|
|
||||||
// Start the transaction. Don't use the timeout context because it would
|
// Start the transaction. Don't use the timeout context because it would
|
||||||
// be applied to the transaction as a whole. If possible, mark the
|
// be applied to the transaction as a whole. If possible, mark the
|
||||||
// transaction as read-only to make sure that potential programming
|
// transaction as read-only to make sure that potential programming
|
||||||
@ -54,7 +36,6 @@ func newReadWriteTx(db *db, readOnly bool) (*readWriteTx, error) {
|
|||||||
},
|
},
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
locker.Unlock()
|
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -62,7 +43,6 @@ func newReadWriteTx(db *db, readOnly bool) (*readWriteTx, error) {
|
|||||||
db: db,
|
db: db,
|
||||||
tx: tx,
|
tx: tx,
|
||||||
active: true,
|
active: true,
|
||||||
locker: locker,
|
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -94,7 +74,6 @@ func (tx *readWriteTx) Rollback() error {
|
|||||||
|
|
||||||
// Unlock the transaction regardless of the error result.
|
// Unlock the transaction regardless of the error result.
|
||||||
tx.active = false
|
tx.active = false
|
||||||
tx.locker.Unlock()
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -162,7 +141,6 @@ func (tx *readWriteTx) Commit() error {
|
|||||||
|
|
||||||
// Unlock the transaction regardless of the error result.
|
// Unlock the transaction regardless of the error result.
|
||||||
tx.active = false
|
tx.active = false
|
||||||
tx.locker.Unlock()
|
|
||||||
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -204,25 +182,3 @@ func (tx *readWriteTx) Exec(query string, args ...interface{}) (sql.Result,
|
|||||||
|
|
||||||
return tx.tx.ExecContext(ctx, query, args...)
|
return tx.tx.ExecContext(ctx, query, args...)
|
||||||
}
|
}
|
||||||
|
|
||||||
// noopLocker is an implementation of a no-op sync.Locker.
|
|
||||||
type noopLocker struct{}
|
|
||||||
|
|
||||||
// newNoopLocker creates a new noopLocker.
|
|
||||||
func newNoopLocker() sync.Locker {
|
|
||||||
return &noopLocker{}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Lock is a noop.
|
|
||||||
//
|
|
||||||
// NOTE: this is part of the sync.Locker interface.
|
|
||||||
func (n *noopLocker) Lock() {
|
|
||||||
}
|
|
||||||
|
|
||||||
// Unlock is a noop.
|
|
||||||
//
|
|
||||||
// NOTE: this is part of the sync.Locker interface.
|
|
||||||
func (n *noopLocker) Unlock() {
|
|
||||||
}
|
|
||||||
|
|
||||||
var _ sync.Locker = (*noopLocker)(nil)
|
|
||||||
|
Loading…
Reference in New Issue
Block a user