From e69686f988783d9408416d19768822daefb10f31 Mon Sep 17 00:00:00 2001 From: Roman Zeyde Date: Tue, 10 Jul 2018 23:52:26 +0300 Subject: [PATCH] Use multiple bulk index workers --- examples/bench_parse.rs | 67 ++++++++++++++++++++++++++++++++++++----- 1 file changed, 59 insertions(+), 8 deletions(-) diff --git a/examples/bench_parse.rs b/examples/bench_parse.rs index a760d25..aff14c4 100644 --- a/examples/bench_parse.rs +++ b/examples/bench_parse.rs @@ -5,15 +5,54 @@ extern crate log; extern crate error_chain; -use std::path::Path; +use std::path::{Path, PathBuf}; +use std::sync::{ + mpsc::{Receiver, SyncSender}, Arc, Mutex, +}; use electrs::{ bulk::Parser, config::Config, daemon::Daemon, errors::*, metrics::Metrics, - store::{DBStore, StoreOptions, WriteStore}, + store::{DBStore, Row, StoreOptions}, util::{spawn_thread, SyncChannel}, }; use error_chain::ChainedError; +type ReadResult = Result>; +type IndexResult = Result>; +type JoinHandle = std::thread::JoinHandle<()>; + +fn start_reader(blk_files: Vec, parser: Arc) -> Arc>> { + let chan = SyncChannel::new(0); + let tx = chan.sender(); + spawn_thread("bulk_read", move || { + for path in blk_files { + tx.send(parser.read_blkfile(&path)) + .expect("failed to send blob"); + } + }); + Arc::new(Mutex::new(chan.into_receiver())) +} + +fn start_indexer( + rx: Arc>>, + tx: SyncSender, + parser: Arc, +) -> JoinHandle { + spawn_thread("bulk_index", move || loop { + let msg = match rx.lock().unwrap().recv() { + Ok(msg) => msg, + Err(_) => { + debug!("no more blocks to index"); + break; + } + }; + tx.send(match msg { + Ok(blob) => parser.index_blkfile(blob), + Err(err) => Err(err), + }).expect("failed to send indexed rows"); + }) +} + fn run(config: Config) -> Result<()> { let metrics = Metrics::new(config.monitoring_addr); metrics.start(); @@ -24,13 +63,25 @@ fn run(config: Config) -> Result<()> { config.network_type, &metrics, )?; - let store = DBStore::open(Path::new("./test-db"), StoreOptions { bulk_import: true }); - let parser = Parser::new(&daemon, &metrics)?; - for path in daemon.list_blk_files()? { - let blob = parser.read_blkfile(&path)?; - let rows = parser.index_blkfile(blob)?; - store.write(rows); + let store = DBStore::open( + Path::new("/opt/tmp/test-db"), + StoreOptions { bulk_import: true }, + ); + let blk_files = daemon.list_blk_files()?; + let parser = Arc::new(Parser::new(&daemon, &metrics)?); + let reader = start_reader(blk_files, parser.clone()); + let indexed = SyncChannel::new(0); + let indexers: Vec = (0..2) + .map(|_| start_indexer(reader.clone(), indexed.sender(), parser.clone())) + .collect(); + + for (i, rows) in indexed.into_receiver().into_iter().enumerate() { + let path = format!("./sstables/{:05}.sst", i); + store.sstable().build(Path::new(&path), rows?); + debug!("{} built", path); } + indexers.into_iter().for_each(|i| i.join().expect("indexer failed")); + Ok(()) }