1
0
mirror of https://github.com/romanz/electrs.git synced 2024-11-19 09:54:09 +01:00
electrs/src/server.rs

197 lines
5.9 KiB
Rust
Raw Normal View History

2021-03-26 09:05:58 +01:00
use anyhow::{Context, Result};
use crossbeam_channel::{select, unbounded, Sender};
2021-03-26 09:05:58 +01:00
use rayon::prelude::*;
use std::{
collections::hash_map::HashMap,
io::{BufRead, BufReader, Write},
net::{Shutdown, TcpListener, TcpStream},
};
use crate::{
config::Config,
electrum::{Client, Rpc},
signals::ExitError,
thread::spawn,
tracker::Tracker,
2021-03-26 09:05:58 +01:00
};
struct Peer {
id: usize,
2021-03-26 09:05:58 +01:00
client: Client,
stream: TcpStream,
}
impl Peer {
fn new(id: usize, stream: TcpStream) -> Self {
let client = Client::default();
Self { id, client, stream }
2021-03-26 09:05:58 +01:00
}
fn send(&mut self, values: Vec<String>) -> Result<()> {
for mut value in values {
debug!("{}: send {}", self.id, value);
value += "\n";
self.stream
.write_all(value.as_bytes())
.with_context(|| format!("failed to send response: {:?}", value))?;
}
Ok(())
}
fn disconnect(self) {
2021-10-01 09:17:06 +02:00
if let Err(e) = self.stream.shutdown(Shutdown::Both) {
warn!("{}: failed to shutdown TCP connection {}", self.id, e)
}
}
}
pub fn run() -> Result<()> {
let result = serve();
if let Err(e) = &result {
for cause in e.chain() {
if cause.downcast_ref::<ExitError>().is_some() {
info!("electrs stopped: {:?}", e);
return Ok(());
}
}
}
result.context("electrs failed")
}
fn serve() -> Result<()> {
let config = Config::from_args();
let tracker = Tracker::new(&config)?;
let mut rpc = Rpc::new(&config, tracker)?;
if config.sync_once {
return Ok(());
}
2021-03-26 09:05:58 +01:00
let (server_tx, server_rx) = unbounded();
if !config.disable_electrum_rpc {
let listener = TcpListener::bind(config.electrum_rpc_addr)?;
info!("serving Electrum RPC on {}", listener.local_addr()?);
spawn("accept_loop", || accept_loop(listener, server_tx)); // detach accepting thread
};
2021-03-26 09:05:58 +01:00
let new_block_rx = rpc.new_block_notification();
2021-03-26 09:05:58 +01:00
let mut peers = HashMap::<usize, Peer>::new();
loop {
select! {
recv(rpc.signal().receiver()) -> result => {
result.context("signal channel disconnected")?;
rpc.signal().exit_flag().poll().context("RPC server interrupted")?;
2021-03-26 09:05:58 +01:00
},
recv(new_block_rx) -> result => match result {
2021-03-26 09:05:58 +01:00
Ok(_) => (), // sync and update
Err(_) => break, // daemon is shutting down
},
recv(server_rx) -> event => {
let event = event.context("server disconnected")?;
2021-10-03 15:13:40 +02:00
handle_event(&rpc, &mut peers, event);
2021-03-26 09:05:58 +01:00
},
default(config.wait_duration) => (), // sync and update
2021-03-26 09:05:58 +01:00
};
2021-10-03 15:13:40 +02:00
if !server_rx.is_empty() {
continue; // continue RPC processing (if not done)
}
2021-03-26 09:05:58 +01:00
rpc.sync().context("rpc sync failed")?;
peers = notify_peers(&rpc, peers); // peers are disconnected on error.
2021-03-26 09:05:58 +01:00
}
Ok(())
}
fn notify_peers(rpc: &Rpc, peers: HashMap<usize, Peer>) -> HashMap<usize, Peer> {
peers
.into_par_iter()
2021-07-20 17:56:43 +02:00
.filter_map(|(_, mut peer)| match notify_peer(rpc, &mut peer) {
Ok(()) => Some((peer.id, peer)),
Err(e) => {
error!("failed to notify peer {}: {}", peer.id, e);
peer.disconnect();
None
}
})
.collect()
}
fn notify_peer(rpc: &Rpc, peer: &mut Peer) -> Result<()> {
let notifications = rpc
.update_client(&mut peer.client)
.context("failed to generate notifications")?;
peer.send(notifications)
.context("failed to send notifications")
}
2021-03-26 09:05:58 +01:00
struct Event {
peer_id: usize,
msg: Message,
}
enum Message {
New(TcpStream),
Request(String),
Done,
}
fn handle_event(rpc: &Rpc, peers: &mut HashMap<usize, Peer>, event: Event) {
let Event { msg, peer_id } = event;
match msg {
2021-03-26 09:05:58 +01:00
Message::New(stream) => {
debug!("{}: connected", peer_id);
peers.insert(peer_id, Peer::new(peer_id, stream));
2021-03-26 09:05:58 +01:00
}
Message::Request(line) => {
let result = match peers.get_mut(&peer_id) {
Some(peer) => handle_request(rpc, peer, &line),
None => return, // unknown peer
2021-03-26 09:05:58 +01:00
};
if let Err(e) = result {
error!("{}: disconnecting due to {}", peer_id, e);
peers.remove(&peer_id).unwrap().disconnect();
2021-03-26 09:05:58 +01:00
}
}
Message::Done => {
// already disconnected, just remove from peers' map
peers.remove(&peer_id);
2021-03-26 09:05:58 +01:00
}
}
}
fn handle_request(rpc: &Rpc, peer: &mut Peer, line: &str) -> Result<()> {
2021-07-20 17:56:43 +02:00
let response = rpc.handle_request(&mut peer.client, line);
peer.send(vec![response])
2021-03-26 09:05:58 +01:00
}
fn accept_loop(listener: TcpListener, server_tx: Sender<Event>) -> Result<()> {
for (peer_id, conn) in listener.incoming().enumerate() {
let stream = conn.context("failed to accept")?;
let tx = server_tx.clone();
spawn("recv_loop", move || {
let result = recv_loop(peer_id, &stream, tx);
2021-10-01 09:17:06 +02:00
if let Err(e) = stream.shutdown(Shutdown::Read) {
warn!("{}: failed to shutdown TCP receiving {}", peer_id, e)
}
2021-03-26 09:05:58 +01:00
result
});
}
Ok(())
}
fn recv_loop(peer_id: usize, stream: &TcpStream, server_tx: Sender<Event>) -> Result<()> {
let msg = Message::New(stream.try_clone()?);
server_tx.send(Event { peer_id, msg })?;
for line in BufReader::new(stream).lines() {
2021-03-26 09:05:58 +01:00
let line = line.with_context(|| format!("{}: recv failed", peer_id))?;
debug!("{}: recv {}", peer_id, line);
let msg = Message::Request(line);
server_tx.send(Event { peer_id, msg })?;
}
debug!("{}: disconnected", peer_id);
let msg = Message::Done;
server_tx.send(Event { peer_id, msg })?;
2021-03-26 09:05:58 +01:00
Ok(())
}