mirror of
https://github.com/lightningdevkit/rust-lightning.git
synced 2025-02-24 23:08:36 +01:00
lightning-net-tokio: Allow custom smart pointers
This commit is contained in:
parent
5023ff05a8
commit
5019b31b28
1 changed files with 51 additions and 25 deletions
|
@ -84,6 +84,7 @@ use lightning::ln::peer_handler::CustomMessageHandler;
|
|||
use lightning::ln::msgs::{ChannelMessageHandler, RoutingMessageHandler, NetAddress};
|
||||
use lightning::util::logger::Logger;
|
||||
|
||||
use std::ops::Deref;
|
||||
use std::task;
|
||||
use std::net::SocketAddr;
|
||||
use std::net::TcpStream as StdTcpStream;
|
||||
|
@ -120,11 +121,16 @@ struct Connection {
|
|||
id: u64,
|
||||
}
|
||||
impl Connection {
|
||||
async fn poll_event_process<CMH, RMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, Arc<CMH>, Arc<RMH>, Arc<L>, Arc<UMH>>>, mut event_receiver: mpsc::Receiver<()>) where
|
||||
CMH: ChannelMessageHandler + 'static + Send + Sync,
|
||||
RMH: RoutingMessageHandler + 'static + Send + Sync,
|
||||
L: Logger + 'static + ?Sized + Send + Sync,
|
||||
UMH: CustomMessageHandler + 'static + Send + Sync {
|
||||
async fn poll_event_process<CMH, RMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, CMH, RMH, L, UMH>>, mut event_receiver: mpsc::Receiver<()>) where
|
||||
CMH: Deref + 'static + Send + Sync,
|
||||
RMH: Deref + 'static + Send + Sync,
|
||||
L: Deref + 'static + Send + Sync,
|
||||
UMH: Deref + 'static + Send + Sync,
|
||||
CMH::Target: ChannelMessageHandler + Send + Sync,
|
||||
RMH::Target: RoutingMessageHandler + Send + Sync,
|
||||
L::Target: Logger + Send + Sync,
|
||||
UMH::Target: CustomMessageHandler + Send + Sync,
|
||||
{
|
||||
loop {
|
||||
if event_receiver.recv().await.is_none() {
|
||||
return;
|
||||
|
@ -133,11 +139,16 @@ impl Connection {
|
|||
}
|
||||
}
|
||||
|
||||
async fn schedule_read<CMH, RMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, Arc<CMH>, Arc<RMH>, Arc<L>, Arc<UMH>>>, us: Arc<Mutex<Self>>, mut reader: io::ReadHalf<TcpStream>, mut read_wake_receiver: mpsc::Receiver<()>, mut write_avail_receiver: mpsc::Receiver<()>) where
|
||||
CMH: ChannelMessageHandler + 'static + Send + Sync,
|
||||
RMH: RoutingMessageHandler + 'static + Send + Sync,
|
||||
L: Logger + 'static + ?Sized + Send + Sync,
|
||||
UMH: CustomMessageHandler + 'static + Send + Sync {
|
||||
async fn schedule_read<CMH, RMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, CMH, RMH, L, UMH>>, us: Arc<Mutex<Self>>, mut reader: io::ReadHalf<TcpStream>, mut read_wake_receiver: mpsc::Receiver<()>, mut write_avail_receiver: mpsc::Receiver<()>) where
|
||||
CMH: Deref + 'static + Send + Sync,
|
||||
RMH: Deref + 'static + Send + Sync,
|
||||
L: Deref + 'static + Send + Sync,
|
||||
UMH: Deref + 'static + Send + Sync,
|
||||
CMH::Target: ChannelMessageHandler + 'static + Send + Sync,
|
||||
RMH::Target: RoutingMessageHandler + 'static + Send + Sync,
|
||||
L::Target: Logger + 'static + Send + Sync,
|
||||
UMH::Target: CustomMessageHandler + 'static + Send + Sync,
|
||||
{
|
||||
// Create a waker to wake up poll_event_process, above
|
||||
let (event_waker, event_receiver) = mpsc::channel(1);
|
||||
tokio::spawn(Self::poll_event_process(Arc::clone(&peer_manager), event_receiver));
|
||||
|
@ -255,11 +266,16 @@ fn get_addr_from_stream(stream: &StdTcpStream) -> Option<NetAddress> {
|
|||
/// The returned future will complete when the peer is disconnected and associated handling
|
||||
/// futures are freed, though, because all processing futures are spawned with tokio::spawn, you do
|
||||
/// not need to poll the provided future in order to make progress.
|
||||
pub fn setup_inbound<CMH, RMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, Arc<CMH>, Arc<RMH>, Arc<L>, Arc<UMH>>>, stream: StdTcpStream) -> impl std::future::Future<Output=()> where
|
||||
CMH: ChannelMessageHandler + 'static + Send + Sync,
|
||||
RMH: RoutingMessageHandler + 'static + Send + Sync,
|
||||
L: Logger + 'static + ?Sized + Send + Sync,
|
||||
UMH: CustomMessageHandler + 'static + Send + Sync {
|
||||
pub fn setup_inbound<CMH, RMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, CMH, RMH, L, UMH>>, stream: StdTcpStream) -> impl std::future::Future<Output=()> where
|
||||
CMH: Deref + 'static + Send + Sync,
|
||||
RMH: Deref + 'static + Send + Sync,
|
||||
L: Deref + 'static + Send + Sync,
|
||||
UMH: Deref + 'static + Send + Sync,
|
||||
CMH::Target: ChannelMessageHandler + Send + Sync,
|
||||
RMH::Target: RoutingMessageHandler + Send + Sync,
|
||||
L::Target: Logger + Send + Sync,
|
||||
UMH::Target: CustomMessageHandler + Send + Sync,
|
||||
{
|
||||
let remote_addr = get_addr_from_stream(&stream);
|
||||
let (reader, write_receiver, read_receiver, us) = Connection::new(stream);
|
||||
#[cfg(debug_assertions)]
|
||||
|
@ -297,11 +313,16 @@ pub fn setup_inbound<CMH, RMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManag
|
|||
/// The returned future will complete when the peer is disconnected and associated handling
|
||||
/// futures are freed, though, because all processing futures are spawned with tokio::spawn, you do
|
||||
/// not need to poll the provided future in order to make progress.
|
||||
pub fn setup_outbound<CMH, RMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, Arc<CMH>, Arc<RMH>, Arc<L>, Arc<UMH>>>, their_node_id: PublicKey, stream: StdTcpStream) -> impl std::future::Future<Output=()> where
|
||||
CMH: ChannelMessageHandler + 'static + Send + Sync,
|
||||
RMH: RoutingMessageHandler + 'static + Send + Sync,
|
||||
L: Logger + 'static + ?Sized + Send + Sync,
|
||||
UMH: CustomMessageHandler + 'static + Send + Sync {
|
||||
pub fn setup_outbound<CMH, RMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, CMH, RMH, L, UMH>>, their_node_id: PublicKey, stream: StdTcpStream) -> impl std::future::Future<Output=()> where
|
||||
CMH: Deref + 'static + Send + Sync,
|
||||
RMH: Deref + 'static + Send + Sync,
|
||||
L: Deref + 'static + Send + Sync,
|
||||
UMH: Deref + 'static + Send + Sync,
|
||||
CMH::Target: ChannelMessageHandler + Send + Sync,
|
||||
RMH::Target: RoutingMessageHandler + Send + Sync,
|
||||
L::Target: Logger + Send + Sync,
|
||||
UMH::Target: CustomMessageHandler + Send + Sync,
|
||||
{
|
||||
let remote_addr = get_addr_from_stream(&stream);
|
||||
let (reader, mut write_receiver, read_receiver, us) = Connection::new(stream);
|
||||
#[cfg(debug_assertions)]
|
||||
|
@ -368,11 +389,16 @@ pub fn setup_outbound<CMH, RMH, L, UMH>(peer_manager: Arc<peer_handler::PeerMana
|
|||
/// disconnected and associated handling futures are freed, though, because all processing in said
|
||||
/// futures are spawned with tokio::spawn, you do not need to poll the second future in order to
|
||||
/// make progress.
|
||||
pub async fn connect_outbound<CMH, RMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, Arc<CMH>, Arc<RMH>, Arc<L>, Arc<UMH>>>, their_node_id: PublicKey, addr: SocketAddr) -> Option<impl std::future::Future<Output=()>> where
|
||||
CMH: ChannelMessageHandler + 'static + Send + Sync,
|
||||
RMH: RoutingMessageHandler + 'static + Send + Sync,
|
||||
L: Logger + 'static + ?Sized + Send + Sync,
|
||||
UMH: CustomMessageHandler + 'static + Send + Sync {
|
||||
pub async fn connect_outbound<CMH, RMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, CMH, RMH, L, UMH>>, their_node_id: PublicKey, addr: SocketAddr) -> Option<impl std::future::Future<Output=()>> where
|
||||
CMH: Deref + 'static + Send + Sync,
|
||||
RMH: Deref + 'static + Send + Sync,
|
||||
L: Deref + 'static + Send + Sync,
|
||||
UMH: Deref + 'static + Send + Sync,
|
||||
CMH::Target: ChannelMessageHandler + Send + Sync,
|
||||
RMH::Target: RoutingMessageHandler + Send + Sync,
|
||||
L::Target: Logger + Send + Sync,
|
||||
UMH::Target: CustomMessageHandler + Send + Sync,
|
||||
{
|
||||
if let Ok(Ok(stream)) = time::timeout(Duration::from_secs(10), async { TcpStream::connect(&addr).await.map(|s| s.into_std().unwrap()) }).await {
|
||||
Some(setup_outbound(peer_manager, their_node_id, stream))
|
||||
} else { None }
|
||||
|
|
Loading…
Add table
Reference in a new issue