From 35f456751f201364dc3f29854af66aee4b139536 Mon Sep 17 00:00:00 2001 From: Roman Zeyde Date: Wed, 17 Jul 2019 20:14:06 +0300 Subject: [PATCH] Allow stopping bulk indexing via SIGINT/SIGTERM --- src/bin/electrs.rs | 3 ++- src/bulk.rs | 12 +++++++++--- 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/src/bin/electrs.rs b/src/bin/electrs.rs index 6c22d71..cca1c01 100644 --- a/src/bin/electrs.rs +++ b/src/bin/electrs.rs @@ -49,7 +49,8 @@ fn run_server(config: &Config) -> Result<()> { full_compaction(store) } else { // faster, but uses more memory - let store = bulk::index_blk_files(&daemon, config.bulk_index_threads, &metrics, store)?; + let store = + bulk::index_blk_files(&daemon, config.bulk_index_threads, &metrics, &signal, store)?; let store = full_compaction(store); index.reload(&store); // make sure the block header index is up-to-date store diff --git a/src/bulk.rs b/src/bulk.rs index 7a7b8c8..82a5fb3 100644 --- a/src/bulk.rs +++ b/src/bulk.rs @@ -17,6 +17,7 @@ use crate::daemon::Daemon; use crate::errors::*; use crate::index::{index_block, last_indexed_block, read_indexed_blockhashes}; use crate::metrics::{CounterVec, Histogram, HistogramOpts, HistogramVec, MetricOpts, Metrics}; +use crate::signal::Waiter; use crate::store::{DBStore, Row, WriteStore}; use crate::util::{spawn_thread, HeaderList, SyncChannel}; @@ -225,6 +226,7 @@ pub fn index_blk_files( daemon: &Daemon, index_threads: usize, metrics: &Metrics, + signal: &Waiter, store: DBStore, ) -> Result { set_open_files_limit(2048); // twice the default `ulimit -n` value @@ -238,10 +240,14 @@ pub fn index_blk_files( let indexers: Vec = (0..index_threads) .map(|_| start_indexer(blobs.clone(), parser.clone(), rows_chan.sender())) .collect(); - Ok(spawn_thread("bulk_writer", move || -> DBStore { + let signal = signal.clone(); + spawn_thread("bulk_writer", move || -> Result { for (rows, path) in rows_chan.into_receiver() { trace!("indexed {:?}: {} rows", path, rows.len()); store.write(rows); + signal + .poll() + .chain_err(|| "stopping bulk indexing due to signal")?; } reader .join() @@ -254,10 +260,10 @@ pub fn index_blk_files( .expect("indexing failed") }); store.write(vec![parser.last_indexed_row()]); - store + Ok(store) }) .join() - .expect("writer panicked")) + .expect("writer panicked") } #[cfg(test)]