1
0
Fork 0
mirror of https://github.com/romanz/electrs.git synced 2025-02-24 23:08:39 +01:00

Actually wait for threads in RPC.

This change implements non-blocking cleanup of threads. It achieves it
by cleaning up periodically based on thread ID, that gets sent from dead
thread over a channel. Aside from internal locks in the channel, there
are no other locks. The cleanup also happens only after a new connection
is accepted, but hopefully won't be an issue, unless there are many
connections that die and then nothing connects for a long time. A final
cleanup happens when the thread is finishing.
This commit is contained in:
Martin Habovstiak 2020-04-12 22:53:35 +02:00
parent a3bfdda32a
commit bc45dbfe41

View file

@ -11,7 +11,6 @@ use std::net::{Shutdown, SocketAddr, TcpListener, TcpStream};
use std::sync::mpsc::{Sender, SyncSender, TrySendError}; use std::sync::mpsc::{Sender, SyncSender, TrySendError};
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use std::thread; use std::thread;
use std::time::Duration;
use crate::errors::*; use crate::errors::*;
use crate::metrics::{Gauge, HistogramOpts, HistogramVec, MetricOpts, Metrics}; use crate::metrics::{Gauge, HistogramOpts, HistogramVec, MetricOpts, Metrics};
@ -554,26 +553,47 @@ impl RPC {
let acceptor = RPC::start_acceptor(addr); let acceptor = RPC::start_acceptor(addr);
RPC::start_notifier(notification, senders.clone(), acceptor.sender()); RPC::start_notifier(notification, senders.clone(), acceptor.sender());
let mut threads = HashMap::new();
let (garbage_sender, garbage_receiver) = crossbeam_channel::unbounded();
while let Some((stream, addr)) = acceptor.receiver().recv().unwrap() { while let Some((stream, addr)) = acceptor.receiver().recv().unwrap() {
// explicitely scope the shadowed variables for the new thread // explicitely scope the shadowed variables for the new thread
let query = Arc::clone(&query); let query = Arc::clone(&query);
let senders = Arc::clone(&senders); let senders = Arc::clone(&senders);
let stats = Arc::clone(&stats); let stats = Arc::clone(&stats);
let garbage_sender = garbage_sender.clone();
// HACK: detach peer-handling threads // HACK: detach peer-handling threads
spawn_thread("peer", move || { let spawned = spawn_thread("peer", move || {
info!("[{}] connected peer", addr); info!("[{}] connected peer", addr);
let conn = Connection::new(query, stream, addr, stats, relayfee); let conn = Connection::new(query, stream, addr, stats, relayfee);
senders.lock().unwrap().push(conn.chan.sender()); senders.lock().unwrap().push(conn.chan.sender());
conn.run(); conn.run();
info!("[{}] disconnected peer", addr); info!("[{}] disconnected peer", addr);
let _ = garbage_sender.send(std::thread::current().id());
}); });
threads.insert(spawned.thread().id(), spawned);
while let Ok(id) = garbage_receiver.try_recv() {
let result = threads
.remove(&id)
.map(std::thread::JoinHandle::join)
.transpose();
if let Err(error) = result {
error!("Failed to join thread: {:?}", error);
}
}
} }
trace!("closing {} RPC connections", senders.lock().unwrap().len()); trace!("closing {} RPC connections", senders.lock().unwrap().len());
for sender in senders.lock().unwrap().iter() { for sender in senders.lock().unwrap().iter() {
let _ = sender.send(Message::Done); let _ = sender.send(Message::Done);
} }
thread::sleep(Duration::from_secs(1)); // TODO: actually wait for threads for (_, thread) in threads {
if let Err(error) = thread.join() {
error!("Failed to join thread: {:?}", error);
}
}
trace!("RPC connections are closed"); trace!("RPC connections are closed");
})), })),