diff --git a/examples/bench_parse.rs b/examples/bench_parse.rs index aff14c4..93fc250 100644 --- a/examples/bench_parse.rs +++ b/examples/bench_parse.rs @@ -12,44 +12,45 @@ use std::sync::{ use electrs::{ bulk::Parser, config::Config, daemon::Daemon, errors::*, metrics::Metrics, - store::{DBStore, Row, StoreOptions}, util::{spawn_thread, SyncChannel}, + store::{DBStore, Row, StoreOptions, WriteStore}, util::{spawn_thread, SyncChannel}, }; use error_chain::ChainedError; -type ReadResult = Result>; -type IndexResult = Result>; -type JoinHandle = std::thread::JoinHandle<()>; +type JoinHandle = std::thread::JoinHandle>; +type BlobReceiver = Arc>>>; -fn start_reader(blk_files: Vec, parser: Arc) -> Arc>> { +fn start_reader(blk_files: Vec, parser: Arc) -> (BlobReceiver, JoinHandle) { let chan = SyncChannel::new(0); - let tx = chan.sender(); - spawn_thread("bulk_read", move || { + let blobs = chan.sender(); + let handle = spawn_thread("bulk_read", move || -> Result<()> { for path in blk_files { - tx.send(parser.read_blkfile(&path)) - .expect("failed to send blob"); + blobs + .send(parser.read_blkfile(&path)?) + .expect("failed to send blk*.dat contents"); } + Ok(()) }); - Arc::new(Mutex::new(chan.into_receiver())) + (Arc::new(Mutex::new(chan.into_receiver())), handle) } fn start_indexer( - rx: Arc>>, - tx: SyncSender, + blobs: BlobReceiver, parser: Arc, + rows: SyncSender>, ) -> JoinHandle { - spawn_thread("bulk_index", move || loop { - let msg = match rx.lock().unwrap().recv() { - Ok(msg) => msg, - Err(_) => { + spawn_thread("bulk_index", move || -> Result<()> { + loop { + let msg = blobs.lock().unwrap().recv(); + if let Ok(blob) = msg { + rows.send(parser.index_blkfile(blob)?) + .expect("failed to send indexed rows") + } else { debug!("no more blocks to index"); break; } - }; - tx.send(match msg { - Ok(blob) => parser.index_blkfile(blob), - Err(err) => Err(err), - }).expect("failed to send indexed rows"); + } + Ok(()) }) } @@ -63,25 +64,32 @@ fn run(config: Config) -> Result<()> { config.network_type, &metrics, )?; - let store = DBStore::open( - Path::new("/opt/tmp/test-db"), - StoreOptions { bulk_import: true }, - ); + let store = DBStore::open(Path::new("./test-db"), StoreOptions { bulk_import: true }); let blk_files = daemon.list_blk_files()?; let parser = Arc::new(Parser::new(&daemon, &metrics)?); - let reader = start_reader(blk_files, parser.clone()); - let indexed = SyncChannel::new(0); + let (blobs, reader) = start_reader(blk_files, parser.clone()); + let rows_chan = SyncChannel::new(0); let indexers: Vec = (0..2) - .map(|_| start_indexer(reader.clone(), indexed.sender(), parser.clone())) + .map(|_| start_indexer(blobs.clone(), parser.clone(), rows_chan.sender())) .collect(); + spawn_thread("bulk_writer", move || { + for rows in rows_chan.into_receiver() { + debug!("indexed {:.2}M rows", rows.len() as f32 / 1e6); + store.write(rows); + } + reader + .join() + .expect("reader panicked") + .expect("reader failed"); - for (i, rows) in indexed.into_receiver().into_iter().enumerate() { - let path = format!("./sstables/{:05}.sst", i); - store.sstable().build(Path::new(&path), rows?); - debug!("{} built", path); - } - indexers.into_iter().for_each(|i| i.join().expect("indexer failed")); - + indexers.into_iter().for_each(|i| { + i.join() + .expect("indexer panicked") + .expect("indexing failed") + }); + store.compact(); + }).join() + .expect("writer panicked"); Ok(()) }