mirror of
https://github.com/romanz/electrs.git
synced 2025-02-23 14:50:45 +01:00
Refactor bulk load benchmark
- bulk writes take ~1h (reading blk*.dat files at ~48MB/s) - full compaction takes ~45m (writing 37GB of SSTables)
This commit is contained in:
parent
1f979baf8f
commit
4da521b3cd
1 changed files with 43 additions and 35 deletions
|
@ -12,44 +12,45 @@ use std::sync::{
|
|||
|
||||
use electrs::{
|
||||
bulk::Parser, config::Config, daemon::Daemon, errors::*, metrics::Metrics,
|
||||
store::{DBStore, Row, StoreOptions}, util::{spawn_thread, SyncChannel},
|
||||
store::{DBStore, Row, StoreOptions, WriteStore}, util::{spawn_thread, SyncChannel},
|
||||
};
|
||||
|
||||
use error_chain::ChainedError;
|
||||
|
||||
type ReadResult = Result<Vec<u8>>;
|
||||
type IndexResult = Result<Vec<Row>>;
|
||||
type JoinHandle = std::thread::JoinHandle<()>;
|
||||
type JoinHandle = std::thread::JoinHandle<Result<()>>;
|
||||
type BlobReceiver = Arc<Mutex<Receiver<Vec<u8>>>>;
|
||||
|
||||
fn start_reader(blk_files: Vec<PathBuf>, parser: Arc<Parser>) -> Arc<Mutex<Receiver<ReadResult>>> {
|
||||
fn start_reader(blk_files: Vec<PathBuf>, parser: Arc<Parser>) -> (BlobReceiver, JoinHandle) {
|
||||
let chan = SyncChannel::new(0);
|
||||
let tx = chan.sender();
|
||||
spawn_thread("bulk_read", move || {
|
||||
let blobs = chan.sender();
|
||||
let handle = spawn_thread("bulk_read", move || -> Result<()> {
|
||||
for path in blk_files {
|
||||
tx.send(parser.read_blkfile(&path))
|
||||
.expect("failed to send blob");
|
||||
blobs
|
||||
.send(parser.read_blkfile(&path)?)
|
||||
.expect("failed to send blk*.dat contents");
|
||||
}
|
||||
Ok(())
|
||||
});
|
||||
Arc::new(Mutex::new(chan.into_receiver()))
|
||||
(Arc::new(Mutex::new(chan.into_receiver())), handle)
|
||||
}
|
||||
|
||||
fn start_indexer(
|
||||
rx: Arc<Mutex<Receiver<ReadResult>>>,
|
||||
tx: SyncSender<IndexResult>,
|
||||
blobs: BlobReceiver,
|
||||
parser: Arc<Parser>,
|
||||
rows: SyncSender<Vec<Row>>,
|
||||
) -> JoinHandle {
|
||||
spawn_thread("bulk_index", move || loop {
|
||||
let msg = match rx.lock().unwrap().recv() {
|
||||
Ok(msg) => msg,
|
||||
Err(_) => {
|
||||
spawn_thread("bulk_index", move || -> Result<()> {
|
||||
loop {
|
||||
let msg = blobs.lock().unwrap().recv();
|
||||
if let Ok(blob) = msg {
|
||||
rows.send(parser.index_blkfile(blob)?)
|
||||
.expect("failed to send indexed rows")
|
||||
} else {
|
||||
debug!("no more blocks to index");
|
||||
break;
|
||||
}
|
||||
};
|
||||
tx.send(match msg {
|
||||
Ok(blob) => parser.index_blkfile(blob),
|
||||
Err(err) => Err(err),
|
||||
}).expect("failed to send indexed rows");
|
||||
}
|
||||
Ok(())
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -63,25 +64,32 @@ fn run(config: Config) -> Result<()> {
|
|||
config.network_type,
|
||||
&metrics,
|
||||
)?;
|
||||
let store = DBStore::open(
|
||||
Path::new("/opt/tmp/test-db"),
|
||||
StoreOptions { bulk_import: true },
|
||||
);
|
||||
let store = DBStore::open(Path::new("./test-db"), StoreOptions { bulk_import: true });
|
||||
let blk_files = daemon.list_blk_files()?;
|
||||
let parser = Arc::new(Parser::new(&daemon, &metrics)?);
|
||||
let reader = start_reader(blk_files, parser.clone());
|
||||
let indexed = SyncChannel::new(0);
|
||||
let (blobs, reader) = start_reader(blk_files, parser.clone());
|
||||
let rows_chan = SyncChannel::new(0);
|
||||
let indexers: Vec<JoinHandle> = (0..2)
|
||||
.map(|_| start_indexer(reader.clone(), indexed.sender(), parser.clone()))
|
||||
.map(|_| start_indexer(blobs.clone(), parser.clone(), rows_chan.sender()))
|
||||
.collect();
|
||||
spawn_thread("bulk_writer", move || {
|
||||
for rows in rows_chan.into_receiver() {
|
||||
debug!("indexed {:.2}M rows", rows.len() as f32 / 1e6);
|
||||
store.write(rows);
|
||||
}
|
||||
reader
|
||||
.join()
|
||||
.expect("reader panicked")
|
||||
.expect("reader failed");
|
||||
|
||||
for (i, rows) in indexed.into_receiver().into_iter().enumerate() {
|
||||
let path = format!("./sstables/{:05}.sst", i);
|
||||
store.sstable().build(Path::new(&path), rows?);
|
||||
debug!("{} built", path);
|
||||
}
|
||||
indexers.into_iter().for_each(|i| i.join().expect("indexer failed"));
|
||||
|
||||
indexers.into_iter().for_each(|i| {
|
||||
i.join()
|
||||
.expect("indexer panicked")
|
||||
.expect("indexing failed")
|
||||
});
|
||||
store.compact();
|
||||
}).join()
|
||||
.expect("writer panicked");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
|
Loading…
Add table
Reference in a new issue