1
0
mirror of https://github.com/romanz/electrs.git synced 2024-11-19 01:43:29 +01:00

Don't ignore signals during IBD

Fixes #532.

Also, don't fail electrs when exiting due to SIGINT/SIGTERM.
This commit is contained in:
Roman Zeyde 2021-10-05 21:39:52 +03:00
parent f896f9c2a4
commit 45a91f9b1b
8 changed files with 92 additions and 51 deletions

View File

@ -1,13 +1,7 @@
#![recursion_limit = "256"]
use anyhow::{Context, Result};
use electrs::{server, Config, Rpc, Tracker};
use anyhow::Result;
fn main() -> Result<()> {
let config = Config::from_args();
let rpc = Rpc::new(&config, Tracker::new(&config)?)?;
if config.sync_once {
return Ok(());
}
server::run(&config, rpc).context("server failed")
electrs::run()
}

View File

@ -16,6 +16,7 @@ use crate::{
chain::{Chain, NewHeader},
config::Config,
p2p::Connection,
signals::ExitFlag,
};
enum PollResult {
@ -73,31 +74,22 @@ fn read_cookie(path: &Path) -> Result<(String, String)> {
fn rpc_connect(config: &Config) -> Result<Client> {
let rpc_url = format!("http://{}", config.daemon_rpc_addr);
let mut client = {
// Allow `wait_for_new_block` to take a bit longer before timing out.
// See https://github.com/romanz/electrs/issues/495 for more details.
let builder = jsonrpc::simple_http::SimpleHttpTransport::builder()
.url(&rpc_url)?
.timeout(config.jsonrpc_timeout);
let builder = match config.daemon_auth.get_auth() {
Auth::None => builder,
Auth::UserPass(user, pass) => builder.auth(user, Some(pass)),
Auth::CookieFile(path) => {
let (user, pass) = read_cookie(&path)?;
builder.auth(user, Some(pass))
}
};
Client::from_jsonrpc(jsonrpc::Client::with_transport(builder.build()))
};
loop {
match rpc_poll(&mut client) {
PollResult::Done(result) => return result.map(|()| client),
PollResult::Retry => {
std::thread::sleep(std::time::Duration::from_secs(1)); // wait a bit before polling
}
// Allow `wait_for_new_block` to take a bit longer before timing out.
// See https://github.com/romanz/electrs/issues/495 for more details.
let builder = jsonrpc::simple_http::SimpleHttpTransport::builder()
.url(&rpc_url)?
.timeout(config.jsonrpc_timeout);
let builder = match config.daemon_auth.get_auth() {
Auth::None => builder,
Auth::UserPass(user, pass) => builder.auth(user, Some(pass)),
Auth::CookieFile(path) => {
let (user, pass) = read_cookie(&path)?;
builder.auth(user, Some(pass))
}
}
};
Ok(Client::from_jsonrpc(jsonrpc::Client::with_transport(
builder.build(),
)))
}
pub struct Daemon {
@ -120,8 +112,24 @@ struct BlockchainInfo {
}
impl Daemon {
pub fn connect(config: &Config) -> Result<Self> {
let rpc = rpc_connect(config)?;
pub(crate) fn connect(config: &Config, exit_flag: &ExitFlag) -> Result<Self> {
let mut rpc = rpc_connect(config)?;
loop {
exit_flag
.poll()
.context("bitcoin RPC polling interrupted")?;
match rpc_poll(&mut rpc) {
PollResult::Done(result) => {
result.context("bitcoind RPC polling failed")?;
break; // on success, finish polling
}
PollResult::Retry => {
std::thread::sleep(std::time::Duration::from_secs(1)); // wait a bit before polling
}
}
}
let network_info = rpc.get_network_info()?;
if network_info.version < 21_00_00 {
bail!("electrs requires bitcoind 0.21+");
@ -133,6 +141,7 @@ impl Daemon {
if blockchain_info.pruned {
bail!("electrs requires non-pruned bitcoind node");
}
let p2p = Mutex::new(Connection::connect(config.network, config.daemon_p2p_addr)?);
Ok(Self { p2p, rpc })
}

View File

@ -134,7 +134,10 @@ impl Rpc {
let signal = Signal::new();
tracker
.sync(&Daemon::connect(config)?, signal.exit_flag())
.sync(
&Daemon::connect(config, signal.exit_flag())?,
signal.exit_flag(),
)
.context("initial sync failed")?;
let cache = Cache::new(tracker.metrics());
@ -142,7 +145,7 @@ impl Rpc {
tracker,
cache,
rpc_duration,
daemon: Daemon::connect(config)?,
daemon: Daemon::connect(config, signal.exit_flag())?,
signal,
banner: config.server_banner.clone(),
port: config.electrum_rpc_addr.port(),

View File

@ -1,4 +1,4 @@
use anyhow::Result;
use anyhow::{Context, Result};
use bitcoin::consensus::{deserialize, serialize};
use bitcoin::{Block, BlockHash, OutPoint, Txid};
@ -166,7 +166,7 @@ impl Index {
}
pub(crate) fn sync(&mut self, daemon: &Daemon, exit_flag: &ExitFlag) -> Result<()> {
while !exit_flag.is_set() {
loop {
let new_headers =
self.observe_duration("headers", || daemon.get_new_headers(&self.chain))?;
if new_headers.is_empty() {
@ -179,12 +179,12 @@ impl Index {
new_headers.last().unwrap().height()
);
for chunk in new_headers.chunks(self.batch_size) {
if exit_flag.is_set() {
bail!(
exit_flag.poll().with_context(|| {
format!(
"indexing interrupted at height: {}",
chunk.first().unwrap().height()
)
}
})?;
let blockhashes: Vec<BlockHash> = chunk.iter().map(|h| h.hash()).collect();
let mut heights = chunk.iter().map(|h| h.height());

View File

@ -20,11 +20,11 @@ mod mempool;
mod merkle;
mod metrics;
mod p2p;
pub mod server;
mod server;
mod signals;
mod status;
mod thread;
mod tracker;
mod types;
pub use {config::Config, electrum::Rpc, tracker::Tracker};
pub use server::run;

View File

@ -11,7 +11,9 @@ use std::{
use crate::{
config::Config,
electrum::{Client, Rpc},
signals::ExitError,
thread::spawn,
tracker::Tracker,
};
struct Peer {
@ -44,7 +46,27 @@ impl Peer {
}
}
pub fn run(config: &Config, mut rpc: Rpc) -> Result<()> {
pub fn run() -> Result<()> {
let result = serve();
if let Err(e) = &result {
for cause in e.chain() {
if cause.downcast_ref::<ExitError>().is_some() {
info!("electrs stopped: {:?}", e);
return Ok(());
}
}
}
result.context("electrs failed")
}
fn serve() -> Result<()> {
let config = Config::from_args();
let tracker = Tracker::new(&config)?;
let mut rpc = Rpc::new(&config, tracker)?;
if config.sync_once {
return Ok(());
}
let listener = TcpListener::bind(config.electrum_rpc_addr)?;
info!("serving Electrum RPC on {}", listener.local_addr()?);
let new_block_rx = rpc.new_block_notification();
@ -57,9 +79,7 @@ pub fn run(config: &Config, mut rpc: Rpc) -> Result<()> {
select! {
recv(rpc.signal().receiver()) -> result => {
result.context("signal channel disconnected")?;
if rpc.signal().exit_flag().is_set() {
break;
}
rpc.signal().exit_flag().poll().context("RPC server interrupted")?;
},
recv(new_block_rx) -> result => match result {
Ok(_) => (), // sync and update
@ -77,7 +97,6 @@ pub fn run(config: &Config, mut rpc: Rpc) -> Result<()> {
rpc.sync().context("rpc sync failed")?;
peers = notify_peers(&rpc, peers); // peers are disconnected on error.
}
info!("stopping Electrum RPC server");
Ok(())
}

View File

@ -7,9 +7,21 @@ use std::sync::{
atomic::{AtomicBool, Ordering},
Arc,
};
use std::{error, fmt};
use crate::thread::spawn;
#[derive(Debug)]
pub struct ExitError;
impl fmt::Display for ExitError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "exiting due to signal")
}
}
impl error::Error for ExitError {}
#[derive(Clone)]
pub(crate) struct ExitFlag {
flag: Arc<AtomicBool>,
@ -22,8 +34,12 @@ impl ExitFlag {
}
}
pub fn is_set(&self) -> bool {
self.flag.load(Ordering::Relaxed)
pub fn poll(&self) -> Result<(), ExitError> {
if self.flag.load(Ordering::Relaxed) {
Err(ExitError)
} else {
Ok(())
}
}
fn set(&self) {

View File

@ -98,7 +98,7 @@ echo "Electrum `$EL stop`" # disconnect wallet
wait $ELECTRUM_PID
kill -INT $ELECTRS_PID # close server
tail_log data/electrs/regtest-debug.log | grep -m1 "stopping Electrum RPC server"
tail_log data/electrs/regtest-debug.log | grep -m1 "electrs stopped"
wait $ELECTRS_PID
$BTC stop # stop bitcoind