1
0
mirror of https://github.com/romanz/electrs.git synced 2024-11-19 01:43:29 +01:00

Allow higher concurrency for >dual core machines

This commit is contained in:
Roman Zeyde 2018-09-07 18:08:36 +03:00
parent 853513ed27
commit 58ea2ed960
No known key found for this signature in database
GPG Key ID: 87CAE5FA46917CBB
5 changed files with 23 additions and 3 deletions

View File

@ -24,6 +24,7 @@ glob = "0.2"
hex = "0.3"
libc = "0.2"
log = "0.4"
num_cpus = "1.0"
page_size = "0.4"
prometheus = "0.4"
rocksdb = "0.10.1"

View File

@ -44,7 +44,7 @@ fn run_server(config: &Config) -> Result<()> {
} else {
// faster, but uses more memory
if is_fully_compacted(&store) == false {
let store = bulk::index_blk_files(&daemon, &metrics, store)?;
let store = bulk::index_blk_files(&daemon, config.bulk_index_threads, &metrics, store)?;
let store = full_compaction(store);
index.reload(&store); // make sure the block header index is up-to-date
store

View File

@ -213,7 +213,12 @@ fn start_indexer(
})
}
pub fn index_blk_files(daemon: &Daemon, metrics: &Metrics, store: DBStore) -> Result<DBStore> {
pub fn index_blk_files(
daemon: &Daemon,
index_threads: usize,
metrics: &Metrics,
store: DBStore,
) -> Result<DBStore> {
set_open_files_limit(2048); // twice the default `ulimit -n` value
let blk_files = daemon.list_blk_files()?;
info!("indexing {} blk*.dat files", blk_files.len());
@ -222,7 +227,7 @@ pub fn index_blk_files(daemon: &Daemon, metrics: &Metrics, store: DBStore) -> Re
let parser = Parser::new(daemon, metrics, indexed_blockhashes)?;
let (blobs, reader) = start_reader(blk_files, parser.clone());
let rows_chan = SyncChannel::new(0);
let indexers: Vec<JoinHandle> = (0..2)
let indexers: Vec<JoinHandle> = (0..index_threads)
.map(|_| start_indexer(blobs.clone(), parser.clone(), rows_chan.sender()))
.collect();
Ok(spawn_thread("bulk_writer", move || -> DBStore {

View File

@ -1,6 +1,7 @@
use bitcoin::network::constants::Network;
use clap::{App, Arg};
use dirs::home_dir;
use num_cpus;
use std::fs;
use std::net::SocketAddr;
use std::path::{Path, PathBuf};
@ -23,6 +24,7 @@ pub struct Config {
pub monitoring_addr: SocketAddr, // for Prometheus monitoring
pub skip_bulk_import: bool, // slower initial indexing, for low-memory systems
pub index_batch_size: usize, // number of blocks to index in parallel
pub bulk_index_threads: usize, // number of threads to use for bulk indexing
}
impl Config {
@ -93,6 +95,12 @@ impl Config {
.help("Number of blocks to get in one JSONRPC request from bitcoind")
.default_value("100"),
)
.arg(
Arg::with_name("bulk_index_threads")
.long("bulk-index-threads")
.help("Number of threads used for bulk indexing (default: use the # of CPUs)")
.default_value("0")
)
.get_matches();
let network_name = m.value_of("network").unwrap_or("mainnet");
@ -160,6 +168,10 @@ impl Config {
stderrlog::Timestamp::Off
});
log.init().expect("logging initialization failed");
let mut bulk_index_threads = value_t_or_exit!(m, "bulk_index_threads", usize);
if bulk_index_threads == 0 {
bulk_index_threads = num_cpus::get();
}
let config = Config {
log,
network_type,
@ -171,6 +183,7 @@ impl Config {
monitoring_addr,
skip_bulk_import: m.is_present("skip_bulk_import"),
index_batch_size: value_t_or_exit!(m, "index_batch_size", usize),
bulk_index_threads,
};
eprintln!("{:?}", config);
config

View File

@ -9,6 +9,7 @@ extern crate dirs;
extern crate glob;
extern crate hex;
extern crate libc;
extern crate num_cpus;
extern crate page_size;
extern crate prometheus;
extern crate rocksdb;