1
0
Fork 0
mirror of https://github.com/romanz/electrs.git synced 2025-02-24 15:02:21 +01:00

Switch to spacejam/rust-rocksdb crate

for latest RocksDB (5.14.2)
This commit is contained in:
Roman Zeyde 2018-07-21 23:36:25 +03:00
parent 18a21bfe56
commit fa3ea2869a
No known key found for this signature in database
GPG key ID: 87CAE5FA46917CBB
2 changed files with 51 additions and 41 deletions

View file

@ -24,7 +24,7 @@ hex = "0.3"
libc = "0.2" libc = "0.2"
log = "0.4" log = "0.4"
prometheus = "0.4" prometheus = "0.4"
rocksdb = { git = "https://github.com/pingcap/rust-rocksdb", rev = "9a1c83c5382fbaee8a5102213c711bbe52d71470" } rocksdb = "0.10.1"
rust-crypto = "0.2" rust-crypto = "0.2"
serde = "1.0" serde = "1.0"
serde_derive = "1.0" serde_derive = "1.0"

View file

@ -1,7 +1,6 @@
use rocksdb; use rocksdb;
use rocksdb::Writable;
use std::path::Path; use std::path::{Path, PathBuf};
use util::Bytes; use util::Bytes;
@ -27,47 +26,54 @@ pub trait WriteStore: Sync {
fn flush(&self); fn flush(&self);
} }
pub struct DBStore { #[derive(Clone)]
db: rocksdb::DB, struct Options {
path: PathBuf,
bulk_import: bool, bulk_import: bool,
} }
impl DBStore { pub struct DBStore {
/// Opens a new RocksDB at the specified location. db: rocksdb::DB,
pub fn open(path: &Path) -> Self { opts: Options,
let path = path.to_str().unwrap(); }
debug!("opening DB at {:?}", path);
let mut db_opts = rocksdb::DBOptions::default();
db_opts.create_if_missing(true);
db_opts.set_keep_log_file_num(10);
db_opts.set_compaction_readahead_size(2 << 20);
let mut cf_opts = rocksdb::ColumnFamilyOptions::new(); impl DBStore {
cf_opts.set_compaction_style(rocksdb::DBCompactionStyle::Level); fn open_opts(opts: Options) -> Self {
cf_opts.compression(rocksdb::DBCompressionType::Snappy); debug!("opening DB at {:?}", opts.path);
cf_opts.set_target_file_size_base(128 << 20); let mut db_opts = rocksdb::Options::default();
cf_opts.set_write_buffer_size(64 << 20); db_opts.create_if_missing(true);
cf_opts.set_min_write_buffer_number(2); // db_opts.set_keep_log_file_num(10);
cf_opts.set_max_write_buffer_number(3); db_opts.set_compaction_readahead_size(2 << 20);
cf_opts.set_disable_auto_compactions(true); // for initial bulk load db_opts.set_compaction_style(rocksdb::DBCompactionStyle::Level);
db_opts.set_compression_type(rocksdb::DBCompressionType::Snappy);
db_opts.set_target_file_size_base(128 << 20);
db_opts.set_write_buffer_size(64 << 20);
db_opts.set_min_write_buffer_number(2);
db_opts.set_max_write_buffer_number(3);
db_opts.set_disable_auto_compactions(opts.bulk_import); // for initial bulk load
let mut block_opts = rocksdb::BlockBasedOptions::default(); let mut block_opts = rocksdb::BlockBasedOptions::default();
block_opts.set_block_size(1 << 20); block_opts.set_block_size(1 << 20);
DBStore { DBStore {
db: rocksdb::DB::open_cf(db_opts, path, vec![("default", cf_opts)]).unwrap(), db: rocksdb::DB::open(&db_opts, &opts.path).unwrap(),
bulk_import: true, opts,
} }
} }
pub fn enable_compaction(mut self) -> Self { /// Opens a new RocksDB at the specified location.
self.bulk_import = false; pub fn open(path: &Path) -> Self {
{ DBStore::open_opts(Options {
let cf = self.db.cf_handle("default").expect("no default CF"); path: path.to_path_buf(),
self.db bulk_import: true,
.set_options_cf(cf, &vec![("disable_auto_compactions", "false")]) })
.expect("failed to enable auto compactions"); }
}
self pub fn enable_compaction(self) -> Self {
let mut opts = self.opts.clone();
opts.bulk_import = false;
drop(self);
// DB must be closed before being re-opened:
DBStore::open_opts(opts)
} }
pub fn put(&self, key: &[u8], value: &[u8]) { pub fn put(&self, key: &[u8], value: &[u8]) {
@ -89,13 +95,17 @@ impl ReadStore for DBStore {
// TODO: use generators // TODO: use generators
fn scan(&self, prefix: &[u8]) -> Vec<Row> { fn scan(&self, prefix: &[u8]) -> Vec<Row> {
let mut rows = vec![]; let mut rows = vec![];
let mut iter = self.db.iter(); for (key, value) in self.db.iterator(rocksdb::IteratorMode::From(
iter.seek(rocksdb::SeekKey::Key(prefix)); prefix,
for (key, value) in &mut iter { rocksdb::Direction::Forward,
)) {
if !key.starts_with(prefix) { if !key.starts_with(prefix) {
break; break;
} }
rows.push(Row { key, value }); rows.push(Row {
key: key.to_vec(),
value: value.to_vec(),
});
} }
rows rows
} }
@ -103,13 +113,13 @@ impl ReadStore for DBStore {
impl WriteStore for DBStore { impl WriteStore for DBStore {
fn write(&self, rows: Vec<Row>) { fn write(&self, rows: Vec<Row>) {
let batch = rocksdb::WriteBatch::default(); let mut batch = rocksdb::WriteBatch::default();
for row in rows { for row in rows {
batch.put(row.key.as_slice(), row.value.as_slice()).unwrap(); batch.put(row.key.as_slice(), row.value.as_slice()).unwrap();
} }
let mut opts = rocksdb::WriteOptions::new(); let mut opts = rocksdb::WriteOptions::new();
opts.set_sync(!self.bulk_import); opts.set_sync(!self.opts.bulk_import);
opts.disable_wal(self.bulk_import); opts.disable_wal(self.opts.bulk_import);
self.db.write_opt(batch, &opts).unwrap(); self.db.write_opt(batch, &opts).unwrap();
} }
@ -124,6 +134,6 @@ impl WriteStore for DBStore {
impl Drop for DBStore { impl Drop for DBStore {
fn drop(&mut self) { fn drop(&mut self) {
trace!("closing DB"); trace!("closing DB at {:?}", self.opts.path);
} }
} }