1
0
mirror of https://github.com/romanz/electrs.git synced 2024-11-19 01:43:29 +01:00

Rename Handler -> Connection and refactor a bit

This commit is contained in:
Roman Zeyde 2018-05-18 11:34:18 +03:00
parent d38308797e
commit f3ac83a376
No known key found for this signature in database
GPG Key ID: 87CAE5FA46917CBB

View File

@ -68,18 +68,22 @@ fn jsonify_header(header: &BlockHeader, height: usize) -> Value {
})
}
struct Handler<'a> {
struct Connection<'a> {
query: &'a Query<'a>,
headers_subscribe: bool,
status_hashes: HashMap<Sha256dHash, Value>, // ScriptHash -> StatusHash
stream: TcpStream,
addr: SocketAddr,
}
impl<'a> Handler<'a> {
pub fn new(query: &'a Query) -> Handler<'a> {
Handler {
impl<'a> Connection<'a> {
pub fn new(query: &'a Query, stream: TcpStream, addr: SocketAddr) -> Connection<'a> {
Connection {
query: query,
headers_subscribe: false,
status_hashes: HashMap::new(),
stream,
addr,
}
}
@ -252,19 +256,22 @@ impl<'a> Handler<'a> {
Ok(result)
}
fn handle_replies(
&mut self,
stream: &mut TcpStream,
addr: SocketAddr,
chan: &Channel,
) -> Result<()> {
fn send_value(&mut self, v: Value) -> Result<()> {
debug!("[{}] <- {}", self.addr, v);
let line = v.to_string() + "\n";
self.stream
.write_all(line.as_bytes())
.chain_err(|| format!("failed to send {}", v))
}
fn handle_replies(&mut self, chan: &Channel) -> Result<()> {
let rx = chan.receiver();
loop {
let msg = rx.recv().chain_err(|| "channel closed")?;
match msg {
Message::Request(line) => {
let cmd: Value = from_str(&line).chain_err(|| "invalid JSON format")?;
debug!("[{}] -> {}", addr, cmd);
debug!("[{}] -> {}", self.addr, cmd);
let reply = match (cmd.get("method"), cmd.get("params"), cmd.get("id")) {
(
Some(&Value::String(ref method)),
@ -273,22 +280,14 @@ impl<'a> Handler<'a> {
) => self.handle_command(method, params, id)?,
_ => bail!("invalid command: {}", cmd),
};
debug!("[{}] <- {}", addr, reply);
let line = reply.to_string() + "\n";
stream
.write_all(line.as_bytes())
.chain_err(|| "failed to send response")?;
self.send_value(reply)?
}
Message::Block(blockhash) => {
debug!("blockhash found: {}", blockhash);
for update in self.update_subscriptions()
.chain_err(|| "failed to get updates")?
{
debug!("update: {}", update);
let line = update.to_string() + "\n";
stream
.write_all(line.as_bytes())
.chain_err(|| "failed to send update")?;
self.send_value(update)?
}
}
Message::Done => {
@ -315,13 +314,13 @@ impl<'a> Handler<'a> {
}
}
pub fn run(mut self, mut stream: TcpStream, addr: SocketAddr, chan: &Channel) {
let reader = BufReader::new(stream.try_clone().expect("failed to clone TcpStream"));
pub fn run(mut self, chan: &Channel) {
let reader = BufReader::new(self.stream.try_clone().expect("failed to clone TcpStream"));
// TODO: figure out graceful shutting down and error logging.
crossbeam::scope(|scope| {
let tx = chan.sender();
scope.spawn(|| Handler::handle_requests(reader, tx));
self.handle_replies(&mut stream, addr, chan).unwrap();
scope.spawn(|| Connection::handle_requests(reader, tx));
self.handle_replies(chan).unwrap();
});
}
}
@ -358,7 +357,7 @@ pub fn serve(addr: &str, query: &Query, chan: Channel) {
loop {
let (stream, addr) = listener.accept().unwrap();
info!("[{}] connected peer", addr);
Handler::new(query).run(stream, addr, &chan);
Connection::new(query, stream, addr).run(&chan);
info!("[{}] disconnected peer", addr);
}
}