mirror of
https://github.com/romanz/electrs.git
synced 2025-02-21 22:21:58 +01:00
Allow concurrent DB background operations for SSDs
Full compactions can be done much faster when running with `db_parallelism=8`.
This commit is contained in:
parent
6bfaba99d5
commit
ed6f38f476
4 changed files with 40 additions and 19 deletions
|
@ -34,6 +34,12 @@ name = "db_log_dir"
|
|||
type = "std::path::PathBuf"
|
||||
doc = "Directory to store index database internal log (default: same as specified by `db_dir`)"
|
||||
|
||||
[[param]]
|
||||
name = "db_parallelism"
|
||||
type = "u8"
|
||||
doc = "Max threads to use for DB background operations (flushes and compactions)"
|
||||
default = "1"
|
||||
|
||||
[[param]]
|
||||
name = "daemon_dir"
|
||||
type = "std::path::PathBuf"
|
||||
|
|
|
@ -127,6 +127,7 @@ pub struct Config {
|
|||
pub network: Network,
|
||||
pub db_path: PathBuf,
|
||||
pub db_log_dir: Option<PathBuf>,
|
||||
pub db_parallelism: u8,
|
||||
pub daemon_auth: SensitiveAuth,
|
||||
pub daemon_rpc_addr: SocketAddr,
|
||||
pub daemon_p2p_addr: SocketAddr,
|
||||
|
@ -354,6 +355,7 @@ impl Config {
|
|||
network: config.network,
|
||||
db_path: config.db_dir,
|
||||
db_log_dir: config.db_log_dir,
|
||||
db_parallelism: config.db_parallelism,
|
||||
daemon_auth,
|
||||
daemon_rpc_addr,
|
||||
daemon_p2p_addr,
|
||||
|
|
50
src/db.rs
50
src/db.rs
|
@ -95,11 +95,14 @@ impl Default for Config {
|
|||
}
|
||||
}
|
||||
|
||||
fn default_opts() -> rocksdb::Options {
|
||||
fn default_opts(parallelism: u8) -> rocksdb::Options {
|
||||
let mut block_opts = rocksdb::BlockBasedOptions::default();
|
||||
block_opts.set_checksum_type(rocksdb::ChecksumType::CRC32c);
|
||||
|
||||
let mut opts = rocksdb::Options::default();
|
||||
opts.increase_parallelism(parallelism.into());
|
||||
opts.set_max_subcompactions(parallelism.into());
|
||||
|
||||
opts.set_keep_log_file_num(10);
|
||||
opts.set_max_open_files(16);
|
||||
opts.set_compaction_style(rocksdb::DBCompactionStyle::Level);
|
||||
|
@ -114,23 +117,27 @@ fn default_opts() -> rocksdb::Options {
|
|||
}
|
||||
|
||||
impl DBStore {
|
||||
fn create_cf_descriptors() -> Vec<rocksdb::ColumnFamilyDescriptor> {
|
||||
fn create_cf_descriptors(parallelism: u8) -> Vec<rocksdb::ColumnFamilyDescriptor> {
|
||||
COLUMN_FAMILIES
|
||||
.iter()
|
||||
.map(|&name| rocksdb::ColumnFamilyDescriptor::new(name, default_opts()))
|
||||
.map(|&name| rocksdb::ColumnFamilyDescriptor::new(name, default_opts(parallelism)))
|
||||
.collect()
|
||||
}
|
||||
|
||||
fn open_internal(path: &Path, log_dir: Option<&Path>) -> Result<Self> {
|
||||
let mut db_opts = default_opts();
|
||||
fn open_internal(path: &Path, log_dir: Option<&Path>, parallelism: u8) -> Result<Self> {
|
||||
let mut db_opts = default_opts(parallelism);
|
||||
db_opts.create_if_missing(true);
|
||||
db_opts.create_missing_column_families(true);
|
||||
if let Some(d) = log_dir {
|
||||
db_opts.set_db_log_dir(d);
|
||||
}
|
||||
|
||||
let db = rocksdb::DB::open_cf_descriptors(&db_opts, path, Self::create_cf_descriptors())
|
||||
.with_context(|| format!("failed to open DB: {}", path.display()))?;
|
||||
let db = rocksdb::DB::open_cf_descriptors(
|
||||
&db_opts,
|
||||
path,
|
||||
Self::create_cf_descriptors(parallelism),
|
||||
)
|
||||
.with_context(|| format!("failed to open DB: {}", path.display()))?;
|
||||
let live_files = db.live_files()?;
|
||||
info!(
|
||||
"{:?}: {} SST files, {} GB, {} Grows",
|
||||
|
@ -155,8 +162,13 @@ impl DBStore {
|
|||
}
|
||||
|
||||
/// Opens a new RocksDB at the specified location.
|
||||
pub fn open(path: &Path, log_dir: Option<&Path>, auto_reindex: bool) -> Result<Self> {
|
||||
let mut store = Self::open_internal(path, log_dir)?;
|
||||
pub fn open(
|
||||
path: &Path,
|
||||
log_dir: Option<&Path>,
|
||||
auto_reindex: bool,
|
||||
parallelism: u8,
|
||||
) -> Result<Self> {
|
||||
let mut store = Self::open_internal(path, log_dir, parallelism)?;
|
||||
let config = store.get_config();
|
||||
debug!("DB {:?}", config);
|
||||
let mut config = config.unwrap_or_default(); // use default config when DB is empty
|
||||
|
@ -182,13 +194,13 @@ impl DBStore {
|
|||
);
|
||||
// close DB before deletion
|
||||
drop(store);
|
||||
rocksdb::DB::destroy(&default_opts(), path).with_context(|| {
|
||||
rocksdb::DB::destroy(&default_opts(parallelism), path).with_context(|| {
|
||||
format!(
|
||||
"re-index required but the old database ({}) can not be deleted",
|
||||
path.display()
|
||||
)
|
||||
})?;
|
||||
store = Self::open_internal(path, log_dir)?;
|
||||
store = Self::open_internal(path, log_dir, parallelism)?;
|
||||
config = Config::default(); // re-init config after dropping DB
|
||||
}
|
||||
if config.compacted {
|
||||
|
@ -432,13 +444,13 @@ mod tests {
|
|||
fn test_reindex_new_format() {
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
{
|
||||
let store = DBStore::open(dir.path(), None, false).unwrap();
|
||||
let store = DBStore::open(dir.path(), None, false, 1).unwrap();
|
||||
let mut config = store.get_config().unwrap();
|
||||
config.format += 1;
|
||||
store.set_config(config);
|
||||
};
|
||||
assert_eq!(
|
||||
DBStore::open(dir.path(), None, false)
|
||||
DBStore::open(dir.path(), None, false, 1)
|
||||
.err()
|
||||
.unwrap()
|
||||
.to_string(),
|
||||
|
@ -449,7 +461,7 @@ mod tests {
|
|||
)
|
||||
);
|
||||
{
|
||||
let store = DBStore::open(dir.path(), None, true).unwrap();
|
||||
let store = DBStore::open(dir.path(), None, true, 1).unwrap();
|
||||
store.flush();
|
||||
let config = store.get_config().unwrap();
|
||||
assert_eq!(config.format, CURRENT_FORMAT);
|
||||
|
@ -467,14 +479,14 @@ mod tests {
|
|||
db.put(b"F", b"").unwrap(); // insert legacy DB compaction marker (in 'default' column family)
|
||||
};
|
||||
assert_eq!(
|
||||
DBStore::open(dir.path(), None, false)
|
||||
DBStore::open(dir.path(), None, false, 1)
|
||||
.err()
|
||||
.unwrap()
|
||||
.to_string(),
|
||||
format!("re-index required due to legacy format",)
|
||||
);
|
||||
{
|
||||
let store = DBStore::open(dir.path(), None, true).unwrap();
|
||||
let store = DBStore::open(dir.path(), None, true, 1).unwrap();
|
||||
store.flush();
|
||||
let config = store.get_config().unwrap();
|
||||
assert_eq!(config.format, CURRENT_FORMAT);
|
||||
|
@ -484,7 +496,7 @@ mod tests {
|
|||
#[test]
|
||||
fn test_db_prefix_scan() {
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
let store = DBStore::open(dir.path(), None, true).unwrap();
|
||||
let store = DBStore::open(dir.path(), None, true, 1).unwrap();
|
||||
|
||||
let items = [
|
||||
*b"ab ",
|
||||
|
@ -509,7 +521,7 @@ mod tests {
|
|||
#[test]
|
||||
fn test_db_log_in_same_dir() {
|
||||
let dir1 = tempfile::tempdir().unwrap();
|
||||
let _store = DBStore::open(dir1.path(), None, true).unwrap();
|
||||
let _store = DBStore::open(dir1.path(), None, true, 1).unwrap();
|
||||
|
||||
// LOG file is created in dir1
|
||||
let dir_files = list_log_files(dir1.path());
|
||||
|
@ -517,7 +529,7 @@ mod tests {
|
|||
|
||||
let dir2 = tempfile::tempdir().unwrap();
|
||||
let dir3 = tempfile::tempdir().unwrap();
|
||||
let _store = DBStore::open(dir2.path(), Some(dir3.path()), true).unwrap();
|
||||
let _store = DBStore::open(dir2.path(), Some(dir3.path()), true, 1).unwrap();
|
||||
|
||||
// *_LOG file is not created in dir2, but in dir3
|
||||
let dir_files = list_log_files(dir2.path());
|
||||
|
|
|
@ -36,6 +36,7 @@ impl Tracker {
|
|||
&config.db_path,
|
||||
config.db_log_dir.as_deref(),
|
||||
config.auto_reindex,
|
||||
config.db_parallelism,
|
||||
)?;
|
||||
let chain = Chain::new(config.network);
|
||||
Ok(Self {
|
||||
|
|
Loading…
Add table
Reference in a new issue