kvdb/postgres: separate general sql code from postgres

In this commit, changes are made to the `kvdb/postgres` package so that
all all the non-postgres-specific code is generalised to be applicable
for all sql code. A follow up commit will move all the general sql code
into its own package.
This commit is contained in:
Elle Mouton 2023-01-17 13:46:45 +02:00
parent 3dfaa88bb4
commit 30ba8cbae9
No known key found for this signature in database
GPG Key ID: D7D916376026F177
6 changed files with 201 additions and 74 deletions

View File

@ -20,10 +20,44 @@ const (
kvTableName = "kv"
)
// SqlConfig holds a set of configuration options of a sql database connection.
type SqlConfig struct {
// DriverName is the string that defines the registered sql driver that
// is to be used.
DriverName string
// Dsn is the database connection string that will be used to connect
// to the db.
Dsn string
// Timeout is the time after which a query to the db will be canceled if
// it has not yet completed.
Timeout time.Duration
// Schema is the name of the schema under which the sql tables should be
// created. It should be left empty for backends like sqlite that do not
// support having more than one schema.
Schema string
// TableNamePrefix is the name that should be used as a table name
// prefix when constructing the KV style table.
TableNamePrefix string
// SQLiteCmdReplacements define a one-to-one string mapping of sql
// keywords to the strings that should replace those keywords in any
// commands. Note that the sqlite keywords to be replaced are
// case-sensitive.
SQLiteCmdReplacements SQLiteCmdReplacements
// WithTxLevelLock when set will ensure that there is a transaction
// level lock.
WithTxLevelLock bool
}
// db holds a reference to the postgres connection.
type db struct {
// cfg is the postgres connection config.
cfg *Config
// cfg is the sql db connection config.
cfg *SqlConfig
// prefix is the table name prefix that is used to simulate namespaces.
// We don't use schemas because at least sqlite does not support that.
@ -38,7 +72,8 @@ type db struct {
// db is the underlying database connection instance.
db *sql.DB
// lock is the global write lock that ensures single writer.
// lock is the global write lock that ensures single writer. This is
// only used if cfg.WithTxLevelLock is set.
lock sync.RWMutex
// table is the name of the table that contains the data for all
@ -68,86 +103,45 @@ func Init(maxConnections int) {
dbConns = newDbConnSet(maxConnections)
}
// newPostgresBackend returns a db object initialized with the passed backend
// config. If postgres connection cannot be established, then returns error.
func newPostgresBackend(ctx context.Context, config *Config, prefix string) (
*db, error) {
// NewSqlBackend returns a db object initialized with the passed backend
// config. If database connection cannot be established, then returns error.
func NewSqlBackend(ctx context.Context, cfg *SqlConfig) (*db, error) {
dbConnsMu.Lock()
defer dbConnsMu.Unlock()
if prefix == "" {
return nil, errors.New("empty postgres prefix")
}
if dbConns == nil {
return nil, errors.New("db connection set not initialized")
}
dbConn, err := dbConns.Open(config.Dsn)
if cfg.TableNamePrefix == "" {
return nil, errors.New("empty table name prefix")
}
table := fmt.Sprintf("%s_%s", cfg.TableNamePrefix, kvTableName)
query := newKVSchemaCreationCmd(
table, cfg.Schema, cfg.SQLiteCmdReplacements,
)
dbConn, err := dbConns.Open(cfg.DriverName, cfg.Dsn)
if err != nil {
return nil, err
}
// Compose system table names.
table := fmt.Sprintf(
"%s_%s", prefix, kvTableName,
)
// Execute the create statements to set up a kv table in postgres. Every
// row points to the bucket that it is one via its parent_id field. A
// NULL parent_id means that the key belongs to the upper-most bucket in
// this table. A constraint on parent_id is enforcing referential
// integrity.
//
// Furthermore there is a <table>_p index on parent_id that is required
// for the foreign key constraint.
//
// Finally there are unique indices on (parent_id, key) to prevent the
// same key being present in a bucket more than once (<table>_up and
// <table>_unp). In postgres, a single index wouldn't enforce the unique
// constraint on rows with a NULL parent_id. Therefore two indices are
// defined.
_, err = dbConn.ExecContext(ctx, `
CREATE SCHEMA IF NOT EXISTS public;
CREATE TABLE IF NOT EXISTS public.`+table+`
(
key bytea NOT NULL,
value bytea,
parent_id bigint,
id bigserial PRIMARY KEY,
sequence bigint,
CONSTRAINT `+table+`_parent FOREIGN KEY (parent_id)
REFERENCES public.`+table+` (id)
ON UPDATE NO ACTION
ON DELETE CASCADE
);
CREATE INDEX IF NOT EXISTS `+table+`_p
ON public.`+table+` (parent_id);
CREATE UNIQUE INDEX IF NOT EXISTS `+table+`_up
ON public.`+table+`
(parent_id, key) WHERE parent_id IS NOT NULL;
CREATE UNIQUE INDEX IF NOT EXISTS `+table+`_unp
ON public.`+table+` (key) WHERE parent_id IS NULL;
`)
_, err = dbConn.ExecContext(ctx, query)
if err != nil {
_ = dbConn.Close()
return nil, err
}
backend := &db{
cfg: config,
prefix: prefix,
return &db{
cfg: cfg,
ctx: ctx,
db: dbConn,
table: table,
}
return backend, nil
prefix: cfg.TableNamePrefix,
}, nil
}
// getTimeoutCtx gets a timeout context for database requests.
@ -269,3 +263,28 @@ func (db *db) Close() error {
return dbConns.Close(db.cfg.Dsn)
}
// sqliteCmdReplacements defines a mapping from some SQLite keywords and phrases
// to their postgres counterparts.
var sqliteCmdReplacements = SQLiteCmdReplacements{
"BLOB": "BYTEA",
"INTEGER PRIMARY KEY": "BIGSERIAL PRIMARY KEY",
}
// newPostgresBackend returns a db object initialized with the passed backend
// config. If postgres connection cannot be established, then returns error.
func newPostgresBackend(ctx context.Context, config *Config, prefix string) (
walletdb.DB, error) {
cfg := &SqlConfig{
DriverName: "pgx",
Dsn: config.Dsn,
Timeout: config.Timeout,
Schema: "public",
TableNamePrefix: prefix,
SQLiteCmdReplacements: sqliteCmdReplacements,
WithTxLevelLock: true,
}
return NewSqlBackend(ctx, cfg)
}

View File

@ -33,7 +33,7 @@ func newDbConnSet(maxConnections int) *dbConnSet {
// Open opens a new database connection. If a connection already exists for the
// given dsn, the existing connection is returned.
func (d *dbConnSet) Open(dsn string) (*sql.DB, error) {
func (d *dbConnSet) Open(driver, dsn string) (*sql.DB, error) {
d.mu.Lock()
defer d.mu.Unlock()
@ -43,7 +43,7 @@ func (d *dbConnSet) Open(dsn string) (*sql.DB, error) {
return dbConn.db, nil
}
db, err := sql.Open("pgx", dsn)
db, err := sql.Open(driver, dsn)
if err != nil {
return nil, err
}

View File

@ -41,7 +41,7 @@ func TestPanic(t *testing.T) {
f, err := NewFixture("")
require.NoError(t, err)
err = f.Db.(*db).Update(func(tx walletdb.ReadWriteTx) error {
err = f.Db.Update(func(tx walletdb.ReadWriteTx) error {
bucket, err := tx.CreateTopLevelBucket([]byte("test"))
require.NoError(t, err)

View File

@ -89,6 +89,15 @@ func (b *readWriteBucket) Get(key []byte) []byte {
panic(err)
}
// When an empty byte array is stored as the value, Sqlite will decode
// that into nil whereas postgres will decode that as an empty byte
// array. Since returning nil is taken to mean that no value has ever
// been written, we ensure here that we at least return an empty array
// so that nil checks will fail.
if len(*value) == 0 {
return []byte{}
}
return *value
}

View File

@ -28,14 +28,17 @@ type readWriteTx struct {
// newReadWriteTx creates an rw transaction using a connection from the
// specified pool.
func newReadWriteTx(db *db, readOnly bool) (*readWriteTx, error) {
// Obtain the global lock instance. An alternative here is to obtain a
// database lock from Postgres. Unfortunately there is no database-level
// lock in Postgres, meaning that each table would need to be locked
// individually. Perhaps an advisory lock could perform this function
// too.
var locker sync.Locker = &db.lock
if readOnly {
locker = db.lock.RLocker()
locker := newNoopLocker()
if db.cfg.WithTxLevelLock {
// Obtain the global lock instance. An alternative here is to
// obtain a database lock from Postgres. Unfortunately there is
// no database-level lock in Postgres, meaning that each table
// would need to be locked individually. Perhaps an advisory
// lock could perform this function too.
locker = &db.lock
if readOnly {
locker = db.lock.RLocker()
}
}
locker.Lock()
@ -198,3 +201,25 @@ func (tx *readWriteTx) Exec(query string, args ...interface{}) (sql.Result,
return tx.tx.ExecContext(ctx, query, args...)
}
// noopLocker is an implementation of a no-op sync.Locker.
type noopLocker struct{}
// newNoopLocker creates a new noopLocker.
func newNoopLocker() sync.Locker {
return &noopLocker{}
}
// Lock is a noop.
//
// NOTE: this is part of the sync.Locker interface.
func (n *noopLocker) Lock() {
}
// Unlock is a noop.
//
// NOTE: this is part of the sync.Locker interface.
func (n *noopLocker) Unlock() {
}
var _ sync.Locker = (*noopLocker)(nil)

74
kvdb/postgres/schema.go Normal file
View File

@ -0,0 +1,74 @@
//go:build kvdb_postgres
package postgres
import (
"fmt"
"strings"
)
// SQLiteCmdReplacements is a one to one mapping of sqlite keywords that should
// be replaced by the mapped strings in any command. Note that the sqlite
// keywords to be replaced are case-sensitive.
type SQLiteCmdReplacements map[string]string
func newKVSchemaCreationCmd(table, schema string,
replacements SQLiteCmdReplacements) string {
var (
tableInSchema = table
finalCmd string
)
if schema != "" {
finalCmd = fmt.Sprintf(
`CREATE SCHEMA IF NOT EXISTS ` + schema + `;`,
)
tableInSchema = fmt.Sprintf("%s.%s", schema, table)
}
// Construct the sql statements to set up a kv table in postgres. Every
// row points to the bucket that it is one via its parent_id field. A
// NULL parent_id means that the key belongs to the uppermost bucket in
// this table. A constraint on parent_id is enforcing referential
// integrity.
//
// Furthermore, there is a <table>_p index on parent_id that is required
// for the foreign key constraint.
//
// Finally, there are unique indices on (parent_id, key) to prevent the
// same key being present in a bucket more than once (<table>_up and
// <table>_unp). In postgres, a single index wouldn't enforce the unique
// constraint on rows with a NULL parent_id. Therefore, two indices are
// defined.
//
// The replacements map can be used to replace any sqlite keywords.
// Callers should note that the sqlite keywords are case-sensitive.
finalCmd += fmt.Sprintf(`
CREATE TABLE IF NOT EXISTS ` + tableInSchema + `
(
key BLOB NOT NULL,
value BLOB,
parent_id BIGINT,
id INTEGER PRIMARY KEY,
sequence BIGINT,
CONSTRAINT ` + table + `_parent FOREIGN KEY (parent_id)
REFERENCES ` + tableInSchema + ` (id)
ON UPDATE NO ACTION
ON DELETE CASCADE
);
CREATE INDEX IF NOT EXISTS ` + table + `_p
ON ` + tableInSchema + ` (parent_id);
CREATE UNIQUE INDEX IF NOT EXISTS ` + table + `_up
ON ` + tableInSchema + `
(parent_id, key) WHERE parent_id IS NOT NULL;
CREATE UNIQUE INDEX IF NOT EXISTS ` + table + `_unp
ON ` + tableInSchema + ` (key) WHERE parent_id IS NULL;
`)
for from, to := range replacements {
finalCmd = strings.Replace(finalCmd, from, to, -1)
}
return finalCmd
}