mirror of
https://github.com/lightningdevkit/rust-lightning.git
synced 2025-03-10 13:35:38 +01:00
Drop dep tokio
's io-util
feat as it broke MSRV and isn't useful
We use `tokio`'s `io-util` feature to provide the `Async{Read,Write}Ext` traits, which allow us to simply launch a read future or `poll_write` directly as well as `split` the `TcpStream` into a read/write half. However, these traits aren't actually doing much for us - they are really just wrapping the `readable` future (which we can trivially use ourselves) and `poll_write` isn't doing anything for us that `poll_write_ready` can't. Similarly, the split logic is actually just `Arc`ing the `TcpStream` and busy-waiting when an operation is busy to prevent concurrent reads/writes. However, there's no reason to prevent concurrent access at the stream level - we aren't ever concurrently writing or reading (though we may concurrently read and write, which is fine). Worse, the `io-util` feature broke MSRV (though they're likely to fix this upstream) and carries two additional dependencies (only one on the latest upstream tokio). Thus, we simply drop the dependency here. Fixes #2527.
This commit is contained in:
parent
61d896d519
commit
eb882a69b6
2 changed files with 37 additions and 42 deletions
|
@ -17,8 +17,8 @@ rustdoc-args = ["--cfg", "docsrs"]
|
||||||
[dependencies]
|
[dependencies]
|
||||||
bitcoin = "0.29.0"
|
bitcoin = "0.29.0"
|
||||||
lightning = { version = "0.0.116", path = "../lightning" }
|
lightning = { version = "0.0.116", path = "../lightning" }
|
||||||
tokio = { version = "1.0", features = [ "io-util", "rt", "sync", "net", "time" ] }
|
tokio = { version = "1.0", features = [ "rt", "sync", "net", "time" ] }
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
tokio = { version = "1.14", features = [ "io-util", "macros", "rt", "rt-multi-thread", "sync", "net", "time" ] }
|
tokio = { version = "1.14", features = [ "macros", "rt", "rt-multi-thread", "sync", "net", "time" ] }
|
||||||
lightning = { version = "0.0.116", path = "../lightning", features = ["_test_utils"] }
|
lightning = { version = "0.0.116", path = "../lightning", features = ["_test_utils"] }
|
||||||
|
|
|
@ -31,10 +31,10 @@
|
||||||
|
|
||||||
use bitcoin::secp256k1::PublicKey;
|
use bitcoin::secp256k1::PublicKey;
|
||||||
|
|
||||||
use tokio::net::TcpStream;
|
use tokio::net::{tcp, TcpStream};
|
||||||
use tokio::{io, time};
|
use tokio::{io, time};
|
||||||
use tokio::sync::mpsc;
|
use tokio::sync::mpsc;
|
||||||
use tokio::io::{AsyncReadExt, AsyncWrite, AsyncWriteExt};
|
use tokio::io::AsyncWrite;
|
||||||
|
|
||||||
use lightning::ln::peer_handler;
|
use lightning::ln::peer_handler;
|
||||||
use lightning::ln::peer_handler::SocketDescriptor as LnSocketTrait;
|
use lightning::ln::peer_handler::SocketDescriptor as LnSocketTrait;
|
||||||
|
@ -59,7 +59,7 @@ static ID_COUNTER: AtomicU64 = AtomicU64::new(0);
|
||||||
// define a trivial two- and three- select macro with the specific types we need and just use that.
|
// define a trivial two- and three- select macro with the specific types we need and just use that.
|
||||||
|
|
||||||
pub(crate) enum SelectorOutput {
|
pub(crate) enum SelectorOutput {
|
||||||
A(Option<()>), B(Option<()>), C(tokio::io::Result<usize>),
|
A(Option<()>), B(Option<()>), C(tokio::io::Result<()>),
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) struct TwoSelector<
|
pub(crate) struct TwoSelector<
|
||||||
|
@ -87,7 +87,7 @@ impl<
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) struct ThreeSelector<
|
pub(crate) struct ThreeSelector<
|
||||||
A: Future<Output=Option<()>> + Unpin, B: Future<Output=Option<()>> + Unpin, C: Future<Output=tokio::io::Result<usize>> + Unpin
|
A: Future<Output=Option<()>> + Unpin, B: Future<Output=Option<()>> + Unpin, C: Future<Output=tokio::io::Result<()>> + Unpin
|
||||||
> {
|
> {
|
||||||
pub a: A,
|
pub a: A,
|
||||||
pub b: B,
|
pub b: B,
|
||||||
|
@ -95,7 +95,7 @@ pub(crate) struct ThreeSelector<
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<
|
impl<
|
||||||
A: Future<Output=Option<()>> + Unpin, B: Future<Output=Option<()>> + Unpin, C: Future<Output=tokio::io::Result<usize>> + Unpin
|
A: Future<Output=Option<()>> + Unpin, B: Future<Output=Option<()>> + Unpin, C: Future<Output=tokio::io::Result<()>> + Unpin
|
||||||
> Future for ThreeSelector<A, B, C> {
|
> Future for ThreeSelector<A, B, C> {
|
||||||
type Output = SelectorOutput;
|
type Output = SelectorOutput;
|
||||||
fn poll(mut self: Pin<&mut Self>, ctx: &mut task::Context<'_>) -> Poll<SelectorOutput> {
|
fn poll(mut self: Pin<&mut Self>, ctx: &mut task::Context<'_>) -> Poll<SelectorOutput> {
|
||||||
|
@ -119,7 +119,7 @@ impl<
|
||||||
/// Connection object (in an Arc<Mutex<>>) in each SocketDescriptor we create as well as in the
|
/// Connection object (in an Arc<Mutex<>>) in each SocketDescriptor we create as well as in the
|
||||||
/// read future (which is returned by schedule_read).
|
/// read future (which is returned by schedule_read).
|
||||||
struct Connection {
|
struct Connection {
|
||||||
writer: Option<io::WriteHalf<TcpStream>>,
|
writer: Option<Arc<TcpStream>>,
|
||||||
// Because our PeerManager is templated by user-provided types, and we can't (as far as I can
|
// Because our PeerManager is templated by user-provided types, and we can't (as far as I can
|
||||||
// tell) have a const RawWakerVTable built out of templated functions, we need some indirection
|
// tell) have a const RawWakerVTable built out of templated functions, we need some indirection
|
||||||
// between being woken up with write-ready and calling PeerManager::write_buffer_space_avail.
|
// between being woken up with write-ready and calling PeerManager::write_buffer_space_avail.
|
||||||
|
@ -156,7 +156,7 @@ impl Connection {
|
||||||
async fn schedule_read<PM: Deref + 'static + Send + Sync + Clone>(
|
async fn schedule_read<PM: Deref + 'static + Send + Sync + Clone>(
|
||||||
peer_manager: PM,
|
peer_manager: PM,
|
||||||
us: Arc<Mutex<Self>>,
|
us: Arc<Mutex<Self>>,
|
||||||
mut reader: io::ReadHalf<TcpStream>,
|
reader: Arc<TcpStream>,
|
||||||
mut read_wake_receiver: mpsc::Receiver<()>,
|
mut read_wake_receiver: mpsc::Receiver<()>,
|
||||||
mut write_avail_receiver: mpsc::Receiver<()>,
|
mut write_avail_receiver: mpsc::Receiver<()>,
|
||||||
) where PM::Target: APeerManager<Descriptor = SocketDescriptor> {
|
) where PM::Target: APeerManager<Descriptor = SocketDescriptor> {
|
||||||
|
@ -200,7 +200,7 @@ impl Connection {
|
||||||
ThreeSelector {
|
ThreeSelector {
|
||||||
a: Box::pin(write_avail_receiver.recv()),
|
a: Box::pin(write_avail_receiver.recv()),
|
||||||
b: Box::pin(read_wake_receiver.recv()),
|
b: Box::pin(read_wake_receiver.recv()),
|
||||||
c: Box::pin(reader.read(&mut buf)),
|
c: Box::pin(reader.readable()),
|
||||||
}.await
|
}.await
|
||||||
};
|
};
|
||||||
match select_result {
|
match select_result {
|
||||||
|
@ -211,8 +211,9 @@ impl Connection {
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
SelectorOutput::B(_) => {},
|
SelectorOutput::B(_) => {},
|
||||||
SelectorOutput::C(read) => {
|
SelectorOutput::C(res) => {
|
||||||
match read {
|
if res.is_err() { break Disconnect::PeerDisconnected; }
|
||||||
|
match reader.try_read(&mut buf) {
|
||||||
Ok(0) => break Disconnect::PeerDisconnected,
|
Ok(0) => break Disconnect::PeerDisconnected,
|
||||||
Ok(len) => {
|
Ok(len) => {
|
||||||
let read_res = peer_manager.as_ref().read_event(&mut our_descriptor, &buf[0..len]);
|
let read_res = peer_manager.as_ref().read_event(&mut our_descriptor, &buf[0..len]);
|
||||||
|
@ -226,7 +227,11 @@ impl Connection {
|
||||||
Err(_) => break Disconnect::CloseConnection,
|
Err(_) => break Disconnect::CloseConnection,
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
Err(_) => break Disconnect::PeerDisconnected,
|
Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {
|
||||||
|
// readable() is allowed to spuriously wake, so we have to handle
|
||||||
|
// WouldBlock here.
|
||||||
|
},
|
||||||
|
Err(e) => break Disconnect::PeerDisconnected,
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
@ -239,18 +244,14 @@ impl Connection {
|
||||||
// here.
|
// here.
|
||||||
let _ = tokio::task::yield_now().await;
|
let _ = tokio::task::yield_now().await;
|
||||||
};
|
};
|
||||||
let writer_option = us.lock().unwrap().writer.take();
|
us.lock().unwrap().writer.take();
|
||||||
if let Some(mut writer) = writer_option {
|
|
||||||
// If the socket is already closed, shutdown() will fail, so just ignore it.
|
|
||||||
let _ = writer.shutdown().await;
|
|
||||||
}
|
|
||||||
if let Disconnect::PeerDisconnected = disconnect_type {
|
if let Disconnect::PeerDisconnected = disconnect_type {
|
||||||
peer_manager.as_ref().socket_disconnected(&our_descriptor);
|
peer_manager.as_ref().socket_disconnected(&our_descriptor);
|
||||||
peer_manager.as_ref().process_events();
|
peer_manager.as_ref().process_events();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn new(stream: StdTcpStream) -> (io::ReadHalf<TcpStream>, mpsc::Receiver<()>, mpsc::Receiver<()>, Arc<Mutex<Self>>) {
|
fn new(stream: StdTcpStream) -> (Arc<TcpStream>, mpsc::Receiver<()>, mpsc::Receiver<()>, Arc<Mutex<Self>>) {
|
||||||
// We only ever need a channel of depth 1 here: if we returned a non-full write to the
|
// We only ever need a channel of depth 1 here: if we returned a non-full write to the
|
||||||
// PeerManager, we will eventually get notified that there is room in the socket to write
|
// PeerManager, we will eventually get notified that there is room in the socket to write
|
||||||
// new bytes, which will generate an event. That event will be popped off the queue before
|
// new bytes, which will generate an event. That event will be popped off the queue before
|
||||||
|
@ -262,11 +263,11 @@ impl Connection {
|
||||||
// false.
|
// false.
|
||||||
let (read_waker, read_receiver) = mpsc::channel(1);
|
let (read_waker, read_receiver) = mpsc::channel(1);
|
||||||
stream.set_nonblocking(true).unwrap();
|
stream.set_nonblocking(true).unwrap();
|
||||||
let (reader, writer) = io::split(TcpStream::from_std(stream).unwrap());
|
let tokio_stream = Arc::new(TcpStream::from_std(stream).unwrap());
|
||||||
|
|
||||||
(reader, write_receiver, read_receiver,
|
(Arc::clone(&tokio_stream), write_receiver, read_receiver,
|
||||||
Arc::new(Mutex::new(Self {
|
Arc::new(Mutex::new(Self {
|
||||||
writer: Some(writer), write_avail, read_waker, read_paused: false,
|
writer: Some(tokio_stream), write_avail, read_waker, read_paused: false,
|
||||||
rl_requested_disconnect: false,
|
rl_requested_disconnect: false,
|
||||||
id: ID_COUNTER.fetch_add(1, Ordering::AcqRel)
|
id: ID_COUNTER.fetch_add(1, Ordering::AcqRel)
|
||||||
})))
|
})))
|
||||||
|
@ -462,9 +463,9 @@ impl SocketDescriptor {
|
||||||
}
|
}
|
||||||
impl peer_handler::SocketDescriptor for SocketDescriptor {
|
impl peer_handler::SocketDescriptor for SocketDescriptor {
|
||||||
fn send_data(&mut self, data: &[u8], resume_read: bool) -> usize {
|
fn send_data(&mut self, data: &[u8], resume_read: bool) -> usize {
|
||||||
// To send data, we take a lock on our Connection to access the WriteHalf of the TcpStream,
|
// To send data, we take a lock on our Connection to access the TcpStream, writing to it if
|
||||||
// writing to it if there's room in the kernel buffer, or otherwise create a new Waker with
|
// there's room in the kernel buffer, or otherwise create a new Waker with a
|
||||||
// a SocketDescriptor in it which can wake up the write_avail Sender, waking up the
|
// SocketDescriptor in it which can wake up the write_avail Sender, waking up the
|
||||||
// processing future which will call write_buffer_space_avail and we'll end up back here.
|
// processing future which will call write_buffer_space_avail and we'll end up back here.
|
||||||
let mut us = self.conn.lock().unwrap();
|
let mut us = self.conn.lock().unwrap();
|
||||||
if us.writer.is_none() {
|
if us.writer.is_none() {
|
||||||
|
@ -484,24 +485,18 @@ impl peer_handler::SocketDescriptor for SocketDescriptor {
|
||||||
let mut ctx = task::Context::from_waker(&waker);
|
let mut ctx = task::Context::from_waker(&waker);
|
||||||
let mut written_len = 0;
|
let mut written_len = 0;
|
||||||
loop {
|
loop {
|
||||||
match std::pin::Pin::new(us.writer.as_mut().unwrap()).poll_write(&mut ctx, &data[written_len..]) {
|
match us.writer.as_ref().unwrap().poll_write_ready(&mut ctx) {
|
||||||
task::Poll::Ready(Ok(res)) => {
|
task::Poll::Ready(Ok(())) => {
|
||||||
// The tokio docs *seem* to indicate this can't happen, and I certainly don't
|
match us.writer.as_ref().unwrap().try_write(&data[written_len..]) {
|
||||||
// know how to handle it if it does (cause it should be a Poll::Pending
|
Ok(res) => {
|
||||||
// instead):
|
debug_assert_ne!(res, 0);
|
||||||
assert_ne!(res, 0);
|
written_len += res;
|
||||||
written_len += res;
|
if written_len == data.len() { return written_len; }
|
||||||
if written_len == data.len() { return written_len; }
|
},
|
||||||
},
|
Err(e) => return written_len,
|
||||||
task::Poll::Ready(Err(e)) => {
|
}
|
||||||
// The tokio docs *seem* to indicate this can't happen, and I certainly don't
|
|
||||||
// know how to handle it if it does (cause it should be a Poll::Pending
|
|
||||||
// instead):
|
|
||||||
assert_ne!(e.kind(), io::ErrorKind::WouldBlock);
|
|
||||||
// Probably we've already been closed, just return what we have and let the
|
|
||||||
// read thread handle closing logic.
|
|
||||||
return written_len;
|
|
||||||
},
|
},
|
||||||
|
task::Poll::Ready(Err(e)) => return written_len,
|
||||||
task::Poll::Pending => {
|
task::Poll::Pending => {
|
||||||
// We're queued up for a write event now, but we need to make sure we also
|
// We're queued up for a write event now, but we need to make sure we also
|
||||||
// pause read given we're now waiting on the remote end to ACK (and in
|
// pause read given we're now waiting on the remote end to ACK (and in
|
||||||
|
|
Loading…
Add table
Reference in a new issue