mirror of
https://github.com/romanz/electrs.git
synced 2025-02-24 15:02:21 +01:00
Don't deadlock when shutting down
This commit is contained in:
parent
7d23da5ffc
commit
a3bfdda32a
1 changed files with 26 additions and 46 deletions
62
src/rpc.rs
62
src/rpc.rs
|
@ -8,9 +8,10 @@ use serde_json::{from_str, Value};
|
|||
use std::collections::HashMap;
|
||||
use std::io::{BufRead, BufReader, Write};
|
||||
use std::net::{Shutdown, SocketAddr, TcpListener, TcpStream};
|
||||
use std::sync::mpsc::{Sender, SyncSender};
|
||||
use std::sync::mpsc::{Sender, SyncSender, TrySendError};
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::thread;
|
||||
use std::time::Duration;
|
||||
|
||||
use crate::errors::*;
|
||||
use crate::metrics::{Gauge, HistogramOpts, HistogramVec, MetricOpts, Metrics};
|
||||
|
@ -488,21 +489,24 @@ struct Stats {
|
|||
impl RPC {
|
||||
fn start_notifier(
|
||||
notification: Channel<Notification>,
|
||||
senders: Arc<Mutex<HashMap<i32, SyncSender<Message>>>>,
|
||||
senders: Arc<Mutex<Vec<SyncSender<Message>>>>,
|
||||
acceptor: Sender<Option<(TcpStream, SocketAddr)>>,
|
||||
) {
|
||||
spawn_thread("notification", move || {
|
||||
for msg in notification.receiver().iter() {
|
||||
let senders = senders.lock().unwrap();
|
||||
let mut senders = senders.lock().unwrap();
|
||||
match msg {
|
||||
Notification::Periodic => {
|
||||
for (i, sender) in senders.iter() {
|
||||
if let Err(e) = sender.try_send(Message::PeriodicUpdate) {
|
||||
debug!("failed to send PeriodicUpdate to peer {}: {}", i, e);
|
||||
for sender in senders.split_off(0) {
|
||||
if let Err(TrySendError::Disconnected(_)) =
|
||||
sender.try_send(Message::PeriodicUpdate)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
senders.push(sender);
|
||||
}
|
||||
}
|
||||
}
|
||||
Notification::Exit => acceptor.send(None).unwrap(),
|
||||
Notification::Exit => acceptor.send(None).unwrap(), // mark acceptor as done
|
||||
}
|
||||
}
|
||||
});
|
||||
|
@ -541,60 +545,36 @@ impl RPC {
|
|||
)),
|
||||
});
|
||||
let notification = Channel::unbounded();
|
||||
|
||||
RPC {
|
||||
notification: notification.sender(),
|
||||
server: Some(spawn_thread("rpc", move || {
|
||||
let senders = Arc::new(Mutex::new(HashMap::<i32, SyncSender<Message>>::new()));
|
||||
let handles = Arc::new(Mutex::new(
|
||||
HashMap::<i32, std::thread::JoinHandle<()>>::new(),
|
||||
));
|
||||
let senders = Arc::new(Mutex::new(Vec::<SyncSender<Message>>::new()));
|
||||
|
||||
let acceptor = RPC::start_acceptor(addr);
|
||||
RPC::start_notifier(notification, senders.clone(), acceptor.sender());
|
||||
|
||||
let mut handle_count = 0;
|
||||
while let Some((stream, addr)) = acceptor.receiver().recv().unwrap() {
|
||||
let handle_id = handle_count;
|
||||
handle_count += 1;
|
||||
|
||||
// explicitely scope the shadowed variables for the new thread
|
||||
let handle: thread::JoinHandle<()> = {
|
||||
let query = Arc::clone(&query);
|
||||
let senders = Arc::clone(&senders);
|
||||
let stats = Arc::clone(&stats);
|
||||
let handles = Arc::clone(&handles);
|
||||
|
||||
// HACK: detach peer-handling threads
|
||||
spawn_thread("peer", move || {
|
||||
info!("[{}] connected peer #{}", addr, handle_id);
|
||||
info!("[{}] connected peer", addr);
|
||||
let conn = Connection::new(query, stream, addr, stats, relayfee);
|
||||
senders
|
||||
.lock()
|
||||
.unwrap()
|
||||
.insert(handle_id, conn.chan.sender());
|
||||
senders.lock().unwrap().push(conn.chan.sender());
|
||||
conn.run();
|
||||
info!("[{}] disconnected peer #{}", addr, handle_id);
|
||||
|
||||
senders.lock().unwrap().remove(&handle_id);
|
||||
handles.lock().unwrap().remove(&handle_id);
|
||||
})
|
||||
};
|
||||
|
||||
handles.lock().unwrap().insert(handle_id, handle);
|
||||
info!("[{}] disconnected peer", addr);
|
||||
});
|
||||
}
|
||||
trace!("closing {} RPC connections", senders.lock().unwrap().len());
|
||||
for sender in senders.lock().unwrap().values() {
|
||||
for sender in senders.lock().unwrap().iter() {
|
||||
let _ = sender.send(Message::Done);
|
||||
}
|
||||
thread::sleep(Duration::from_secs(1)); // TODO: actually wait for threads
|
||||
|
||||
trace!(
|
||||
"waiting for {} RPC handling threads",
|
||||
handles.lock().unwrap().len()
|
||||
);
|
||||
for (_, handle) in handles.lock().unwrap().drain() {
|
||||
if let Err(e) = handle.join() {
|
||||
warn!("failed to join thread: {:?}", e);
|
||||
}
|
||||
}
|
||||
trace!("RPC connections are closed");
|
||||
})),
|
||||
}
|
||||
|
|
Loading…
Add table
Reference in a new issue