From 30ba8cbae9af18914897297a777d6159861a21ca Mon Sep 17 00:00:00 2001 From: Elle Mouton Date: Tue, 17 Jan 2023 13:46:45 +0200 Subject: [PATCH] kvdb/postgres: separate general sql code from postgres In this commit, changes are made to the `kvdb/postgres` package so that all all the non-postgres-specific code is generalised to be applicable for all sql code. A follow up commit will move all the general sql code into its own package. --- kvdb/postgres/db.go | 145 +++++++++++++++++------------- kvdb/postgres/db_conn_set.go | 4 +- kvdb/postgres/db_test.go | 2 +- kvdb/postgres/readwrite_bucket.go | 9 ++ kvdb/postgres/readwrite_tx.go | 41 +++++++-- kvdb/postgres/schema.go | 74 +++++++++++++++ 6 files changed, 201 insertions(+), 74 deletions(-) create mode 100644 kvdb/postgres/schema.go diff --git a/kvdb/postgres/db.go b/kvdb/postgres/db.go index aa517e652..2c31d46e0 100644 --- a/kvdb/postgres/db.go +++ b/kvdb/postgres/db.go @@ -20,10 +20,44 @@ const ( kvTableName = "kv" ) +// SqlConfig holds a set of configuration options of a sql database connection. +type SqlConfig struct { + // DriverName is the string that defines the registered sql driver that + // is to be used. + DriverName string + + // Dsn is the database connection string that will be used to connect + // to the db. + Dsn string + + // Timeout is the time after which a query to the db will be canceled if + // it has not yet completed. + Timeout time.Duration + + // Schema is the name of the schema under which the sql tables should be + // created. It should be left empty for backends like sqlite that do not + // support having more than one schema. + Schema string + + // TableNamePrefix is the name that should be used as a table name + // prefix when constructing the KV style table. + TableNamePrefix string + + // SQLiteCmdReplacements define a one-to-one string mapping of sql + // keywords to the strings that should replace those keywords in any + // commands. Note that the sqlite keywords to be replaced are + // case-sensitive. + SQLiteCmdReplacements SQLiteCmdReplacements + + // WithTxLevelLock when set will ensure that there is a transaction + // level lock. + WithTxLevelLock bool +} + // db holds a reference to the postgres connection. type db struct { - // cfg is the postgres connection config. - cfg *Config + // cfg is the sql db connection config. + cfg *SqlConfig // prefix is the table name prefix that is used to simulate namespaces. // We don't use schemas because at least sqlite does not support that. @@ -38,7 +72,8 @@ type db struct { // db is the underlying database connection instance. db *sql.DB - // lock is the global write lock that ensures single writer. + // lock is the global write lock that ensures single writer. This is + // only used if cfg.WithTxLevelLock is set. lock sync.RWMutex // table is the name of the table that contains the data for all @@ -68,86 +103,45 @@ func Init(maxConnections int) { dbConns = newDbConnSet(maxConnections) } -// newPostgresBackend returns a db object initialized with the passed backend -// config. If postgres connection cannot be established, then returns error. -func newPostgresBackend(ctx context.Context, config *Config, prefix string) ( - *db, error) { - +// NewSqlBackend returns a db object initialized with the passed backend +// config. If database connection cannot be established, then returns error. +func NewSqlBackend(ctx context.Context, cfg *SqlConfig) (*db, error) { dbConnsMu.Lock() defer dbConnsMu.Unlock() - if prefix == "" { - return nil, errors.New("empty postgres prefix") - } - if dbConns == nil { return nil, errors.New("db connection set not initialized") } - dbConn, err := dbConns.Open(config.Dsn) + if cfg.TableNamePrefix == "" { + return nil, errors.New("empty table name prefix") + } + + table := fmt.Sprintf("%s_%s", cfg.TableNamePrefix, kvTableName) + + query := newKVSchemaCreationCmd( + table, cfg.Schema, cfg.SQLiteCmdReplacements, + ) + + dbConn, err := dbConns.Open(cfg.DriverName, cfg.Dsn) if err != nil { return nil, err } - // Compose system table names. - table := fmt.Sprintf( - "%s_%s", prefix, kvTableName, - ) - - // Execute the create statements to set up a kv table in postgres. Every - // row points to the bucket that it is one via its parent_id field. A - // NULL parent_id means that the key belongs to the upper-most bucket in - // this table. A constraint on parent_id is enforcing referential - // integrity. - // - // Furthermore there is a _p index on parent_id that is required - // for the foreign key constraint. - // - // Finally there are unique indices on (parent_id, key) to prevent the - // same key being present in a bucket more than once (
_up and - //
_unp). In postgres, a single index wouldn't enforce the unique - // constraint on rows with a NULL parent_id. Therefore two indices are - // defined. - _, err = dbConn.ExecContext(ctx, ` -CREATE SCHEMA IF NOT EXISTS public; -CREATE TABLE IF NOT EXISTS public.`+table+` -( - key bytea NOT NULL, - value bytea, - parent_id bigint, - id bigserial PRIMARY KEY, - sequence bigint, - CONSTRAINT `+table+`_parent FOREIGN KEY (parent_id) - REFERENCES public.`+table+` (id) - ON UPDATE NO ACTION - ON DELETE CASCADE -); - -CREATE INDEX IF NOT EXISTS `+table+`_p - ON public.`+table+` (parent_id); - -CREATE UNIQUE INDEX IF NOT EXISTS `+table+`_up - ON public.`+table+` - (parent_id, key) WHERE parent_id IS NOT NULL; - -CREATE UNIQUE INDEX IF NOT EXISTS `+table+`_unp - ON public.`+table+` (key) WHERE parent_id IS NULL; -`) + _, err = dbConn.ExecContext(ctx, query) if err != nil { _ = dbConn.Close() return nil, err } - backend := &db{ - cfg: config, - prefix: prefix, + return &db{ + cfg: cfg, ctx: ctx, db: dbConn, table: table, - } - - return backend, nil + prefix: cfg.TableNamePrefix, + }, nil } // getTimeoutCtx gets a timeout context for database requests. @@ -269,3 +263,28 @@ func (db *db) Close() error { return dbConns.Close(db.cfg.Dsn) } + +// sqliteCmdReplacements defines a mapping from some SQLite keywords and phrases +// to their postgres counterparts. +var sqliteCmdReplacements = SQLiteCmdReplacements{ + "BLOB": "BYTEA", + "INTEGER PRIMARY KEY": "BIGSERIAL PRIMARY KEY", +} + +// newPostgresBackend returns a db object initialized with the passed backend +// config. If postgres connection cannot be established, then returns error. +func newPostgresBackend(ctx context.Context, config *Config, prefix string) ( + walletdb.DB, error) { + + cfg := &SqlConfig{ + DriverName: "pgx", + Dsn: config.Dsn, + Timeout: config.Timeout, + Schema: "public", + TableNamePrefix: prefix, + SQLiteCmdReplacements: sqliteCmdReplacements, + WithTxLevelLock: true, + } + + return NewSqlBackend(ctx, cfg) +} diff --git a/kvdb/postgres/db_conn_set.go b/kvdb/postgres/db_conn_set.go index ced065969..736f8516b 100644 --- a/kvdb/postgres/db_conn_set.go +++ b/kvdb/postgres/db_conn_set.go @@ -33,7 +33,7 @@ func newDbConnSet(maxConnections int) *dbConnSet { // Open opens a new database connection. If a connection already exists for the // given dsn, the existing connection is returned. -func (d *dbConnSet) Open(dsn string) (*sql.DB, error) { +func (d *dbConnSet) Open(driver, dsn string) (*sql.DB, error) { d.mu.Lock() defer d.mu.Unlock() @@ -43,7 +43,7 @@ func (d *dbConnSet) Open(dsn string) (*sql.DB, error) { return dbConn.db, nil } - db, err := sql.Open("pgx", dsn) + db, err := sql.Open(driver, dsn) if err != nil { return nil, err } diff --git a/kvdb/postgres/db_test.go b/kvdb/postgres/db_test.go index d397223d1..0378b4dbb 100644 --- a/kvdb/postgres/db_test.go +++ b/kvdb/postgres/db_test.go @@ -41,7 +41,7 @@ func TestPanic(t *testing.T) { f, err := NewFixture("") require.NoError(t, err) - err = f.Db.(*db).Update(func(tx walletdb.ReadWriteTx) error { + err = f.Db.Update(func(tx walletdb.ReadWriteTx) error { bucket, err := tx.CreateTopLevelBucket([]byte("test")) require.NoError(t, err) diff --git a/kvdb/postgres/readwrite_bucket.go b/kvdb/postgres/readwrite_bucket.go index f71db50bd..71856b213 100644 --- a/kvdb/postgres/readwrite_bucket.go +++ b/kvdb/postgres/readwrite_bucket.go @@ -89,6 +89,15 @@ func (b *readWriteBucket) Get(key []byte) []byte { panic(err) } + // When an empty byte array is stored as the value, Sqlite will decode + // that into nil whereas postgres will decode that as an empty byte + // array. Since returning nil is taken to mean that no value has ever + // been written, we ensure here that we at least return an empty array + // so that nil checks will fail. + if len(*value) == 0 { + return []byte{} + } + return *value } diff --git a/kvdb/postgres/readwrite_tx.go b/kvdb/postgres/readwrite_tx.go index 592128ad6..c31b05603 100644 --- a/kvdb/postgres/readwrite_tx.go +++ b/kvdb/postgres/readwrite_tx.go @@ -28,14 +28,17 @@ type readWriteTx struct { // newReadWriteTx creates an rw transaction using a connection from the // specified pool. func newReadWriteTx(db *db, readOnly bool) (*readWriteTx, error) { - // Obtain the global lock instance. An alternative here is to obtain a - // database lock from Postgres. Unfortunately there is no database-level - // lock in Postgres, meaning that each table would need to be locked - // individually. Perhaps an advisory lock could perform this function - // too. - var locker sync.Locker = &db.lock - if readOnly { - locker = db.lock.RLocker() + locker := newNoopLocker() + if db.cfg.WithTxLevelLock { + // Obtain the global lock instance. An alternative here is to + // obtain a database lock from Postgres. Unfortunately there is + // no database-level lock in Postgres, meaning that each table + // would need to be locked individually. Perhaps an advisory + // lock could perform this function too. + locker = &db.lock + if readOnly { + locker = db.lock.RLocker() + } } locker.Lock() @@ -198,3 +201,25 @@ func (tx *readWriteTx) Exec(query string, args ...interface{}) (sql.Result, return tx.tx.ExecContext(ctx, query, args...) } + +// noopLocker is an implementation of a no-op sync.Locker. +type noopLocker struct{} + +// newNoopLocker creates a new noopLocker. +func newNoopLocker() sync.Locker { + return &noopLocker{} +} + +// Lock is a noop. +// +// NOTE: this is part of the sync.Locker interface. +func (n *noopLocker) Lock() { +} + +// Unlock is a noop. +// +// NOTE: this is part of the sync.Locker interface. +func (n *noopLocker) Unlock() { +} + +var _ sync.Locker = (*noopLocker)(nil) diff --git a/kvdb/postgres/schema.go b/kvdb/postgres/schema.go new file mode 100644 index 000000000..9276326e3 --- /dev/null +++ b/kvdb/postgres/schema.go @@ -0,0 +1,74 @@ +//go:build kvdb_postgres + +package postgres + +import ( + "fmt" + "strings" +) + +// SQLiteCmdReplacements is a one to one mapping of sqlite keywords that should +// be replaced by the mapped strings in any command. Note that the sqlite +// keywords to be replaced are case-sensitive. +type SQLiteCmdReplacements map[string]string + +func newKVSchemaCreationCmd(table, schema string, + replacements SQLiteCmdReplacements) string { + + var ( + tableInSchema = table + finalCmd string + ) + if schema != "" { + finalCmd = fmt.Sprintf( + `CREATE SCHEMA IF NOT EXISTS ` + schema + `;`, + ) + + tableInSchema = fmt.Sprintf("%s.%s", schema, table) + } + + // Construct the sql statements to set up a kv table in postgres. Every + // row points to the bucket that it is one via its parent_id field. A + // NULL parent_id means that the key belongs to the uppermost bucket in + // this table. A constraint on parent_id is enforcing referential + // integrity. + // + // Furthermore, there is a
_p index on parent_id that is required + // for the foreign key constraint. + // + // Finally, there are unique indices on (parent_id, key) to prevent the + // same key being present in a bucket more than once (
_up and + //
_unp). In postgres, a single index wouldn't enforce the unique + // constraint on rows with a NULL parent_id. Therefore, two indices are + // defined. + // + // The replacements map can be used to replace any sqlite keywords. + // Callers should note that the sqlite keywords are case-sensitive. + finalCmd += fmt.Sprintf(` +CREATE TABLE IF NOT EXISTS ` + tableInSchema + ` +( + key BLOB NOT NULL, + value BLOB, + parent_id BIGINT, + id INTEGER PRIMARY KEY, + sequence BIGINT, + CONSTRAINT ` + table + `_parent FOREIGN KEY (parent_id) + REFERENCES ` + tableInSchema + ` (id) + ON UPDATE NO ACTION + ON DELETE CASCADE +); +CREATE INDEX IF NOT EXISTS ` + table + `_p + ON ` + tableInSchema + ` (parent_id); +CREATE UNIQUE INDEX IF NOT EXISTS ` + table + `_up + ON ` + tableInSchema + ` + (parent_id, key) WHERE parent_id IS NOT NULL; +CREATE UNIQUE INDEX IF NOT EXISTS ` + table + `_unp + ON ` + tableInSchema + ` (key) WHERE parent_id IS NULL; +`) + + for from, to := range replacements { + finalCmd = strings.Replace(finalCmd, from, to, -1) + } + + return finalCmd +}