Update ChannelManager's ChannelMonitor Arc to be a Deref

Additional changes:
* Update fuzz crate to match ChannelManager's new API
* Update lightning-net-tokio library to match ChannelManager's new ChannelMonitor Deref API
* Update tests to match ChannelManager's new ChannelMonitor Deref API
This commit is contained in:
Valentine Wallace 2020-01-16 13:26:38 -05:00 committed by Matt Corallo
parent 9a02115437
commit 4833d1acf9
11 changed files with 606 additions and 277 deletions

View File

@ -85,7 +85,7 @@ pub struct TestChannelMonitor {
impl TestChannelMonitor {
pub fn new(chain_monitor: Arc<dyn chaininterface::ChainWatchInterface>, broadcaster: Arc<dyn chaininterface::BroadcasterInterface>, logger: Arc<dyn Logger>, feeest: Arc<dyn chaininterface::FeeEstimator>) -> Self {
Self {
simple_monitor: channelmonitor::SimpleManyChannelMonitor::new(chain_monitor, broadcaster, logger, feeest),
simple_monitor: Arc::new(channelmonitor::SimpleManyChannelMonitor::new(chain_monitor, broadcaster, logger, feeest)),
update_ret: Mutex::new(Ok(())),
latest_good_update: Mutex::new(HashMap::new()),
latest_update_good: Mutex::new(HashMap::new()),
@ -190,7 +190,7 @@ pub fn do_test(data: &[u8]) {
config.channel_options.fee_proportional_millionths = 0;
config.channel_options.announced_channel = true;
config.peer_channel_config_limits.min_dust_limit_satoshis = 0;
(ChannelManager::new(Network::Bitcoin, fee_est.clone(), monitor.clone(), broadcast.clone(), Arc::clone(&logger), keys_manager.clone(), config, 0).unwrap(),
(Arc::new(ChannelManager::new(Network::Bitcoin, fee_est.clone(), monitor.clone() as Arc<channelmonitor::ManyChannelMonitor>, broadcast.clone(), Arc::clone(&logger), keys_manager.clone(), config, 0).unwrap()),
monitor)
} }
}
@ -221,14 +221,14 @@ pub fn do_test(data: &[u8]) {
let read_args = ChannelManagerReadArgs {
keys_manager,
fee_estimator: fee_est.clone(),
monitor: monitor.clone(),
monitor: monitor.clone() as Arc<channelmonitor::ManyChannelMonitor>,
tx_broadcaster: broadcast.clone(),
logger,
default_config: config,
channel_monitors: &mut monitor_refs,
};
let res = (<(Sha256d, ChannelManager<EnforcingChannelKeys>)>::read(&mut Cursor::new(&$ser.0), read_args).expect("Failed to read manager").1, monitor);
let res = (<(Sha256d, ChannelManager<EnforcingChannelKeys, Arc<channelmonitor::ManyChannelMonitor>>)>::read(&mut Cursor::new(&$ser.0), read_args).expect("Failed to read manager").1, monitor);
for (_, was_good) in $old_monitors.latest_updates_good_at_last_ser.lock().unwrap().iter() {
if !was_good {
// If the last time we updated a monitor we didn't successfully update (and we

View File

@ -136,9 +136,9 @@ impl<'a> Hash for Peer<'a> {
}
struct MoneyLossDetector<'a> {
manager: Arc<ChannelManager<EnforcingChannelKeys>>,
manager: Arc<ChannelManager<EnforcingChannelKeys, Arc<channelmonitor::ManyChannelMonitor>>>,
monitor: Arc<channelmonitor::SimpleManyChannelMonitor<OutPoint>>,
handler: PeerManager<Peer<'a>>,
handler: PeerManager<Peer<'a>, Arc<ChannelManager<EnforcingChannelKeys, Arc<channelmonitor::ManyChannelMonitor>>>>,
peers: &'a RefCell<[bool; 256]>,
funding_txn: Vec<Transaction>,
@ -149,7 +149,7 @@ struct MoneyLossDetector<'a> {
blocks_connected: u32,
}
impl<'a> MoneyLossDetector<'a> {
pub fn new(peers: &'a RefCell<[bool; 256]>, manager: Arc<ChannelManager<EnforcingChannelKeys>>, monitor: Arc<channelmonitor::SimpleManyChannelMonitor<OutPoint>>, handler: PeerManager<Peer<'a>>) -> Self {
pub fn new(peers: &'a RefCell<[bool; 256]>, manager: Arc<ChannelManager<EnforcingChannelKeys, Arc<channelmonitor::ManyChannelMonitor>>>, monitor: Arc<channelmonitor::SimpleManyChannelMonitor<OutPoint>>, handler: PeerManager<Peer<'a>, Arc<ChannelManager<EnforcingChannelKeys, Arc<channelmonitor::ManyChannelMonitor>>>>) -> Self {
MoneyLossDetector {
manager,
monitor,
@ -320,14 +320,14 @@ pub fn do_test(data: &[u8], logger: &Arc<dyn Logger>) {
let watch = Arc::new(ChainWatchInterfaceUtil::new(Network::Bitcoin, Arc::clone(&logger)));
let broadcast = Arc::new(TestBroadcaster{});
let monitor = channelmonitor::SimpleManyChannelMonitor::new(watch.clone(), broadcast.clone(), Arc::clone(&logger), fee_est.clone());
let monitor = Arc::new(channelmonitor::SimpleManyChannelMonitor::new(watch.clone(), broadcast.clone(), Arc::clone(&logger), fee_est.clone()));
let keys_manager = Arc::new(KeyProvider { node_secret: our_network_key.clone(), counter: AtomicU64::new(0) });
let mut config = UserConfig::default();
config.channel_options.fee_proportional_millionths = slice_to_be32(get_slice!(4));
config.channel_options.announced_channel = get_slice!(1)[0] != 0;
config.peer_channel_config_limits.min_dust_limit_satoshis = 0;
let channelmanager = ChannelManager::new(Network::Bitcoin, fee_est.clone(), monitor.clone(), broadcast.clone(), Arc::clone(&logger), keys_manager.clone(), config, 0).unwrap();
let channelmanager = Arc::new(ChannelManager::new(Network::Bitcoin, fee_est.clone(), monitor.clone() as Arc<channelmonitor::ManyChannelMonitor>, broadcast.clone(), Arc::clone(&logger), keys_manager.clone(), config, 0).unwrap());
let router = Arc::new(Router::new(PublicKey::from_secret_key(&Secp256k1::signing_only(), &keys_manager.get_node_secret()), watch.clone(), Arc::clone(&logger)));
let peers = RefCell::new([false; 256]);

View File

@ -19,6 +19,7 @@ use tokio::net::TcpStream;
use lightning::ln::peer_handler;
use lightning::ln::peer_handler::SocketDescriptor as LnSocketTrait;
use lightning::ln::msgs::ChannelMessageHandler;
use std::mem;
use std::net::SocketAddr;
@ -42,7 +43,7 @@ pub struct Connection {
id: u64,
}
impl Connection {
fn schedule_read(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor>>, us: Arc<Mutex<Self>>, reader: futures::stream::SplitStream<tokio_codec::Framed<TcpStream, tokio_codec::BytesCodec>>) {
fn schedule_read<CMH: ChannelMessageHandler + 'static>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor<CMH>, Arc<CMH>>>, us: Arc<Mutex<Self>>, reader: futures::stream::SplitStream<tokio_codec::Framed<TcpStream, tokio_codec::BytesCodec>>) {
let us_ref = us.clone();
let us_close_ref = us.clone();
let peer_manager_ref = peer_manager.clone();
@ -110,7 +111,7 @@ impl Connection {
///
/// You should poll the Receive end of event_notify and call get_and_clear_pending_events() on
/// ChannelManager and ChannelMonitor objects.
pub fn setup_inbound(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor>>, event_notify: mpsc::Sender<()>, stream: TcpStream) {
pub fn setup_inbound<CMH: ChannelMessageHandler + 'static>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor<CMH>, Arc<CMH>>>, event_notify: mpsc::Sender<()>, stream: TcpStream) {
let (reader, us) = Self::new(event_notify, stream);
if let Ok(_) = peer_manager.new_inbound_connection(SocketDescriptor::new(us.clone(), peer_manager.clone())) {
@ -124,7 +125,7 @@ impl Connection {
///
/// You should poll the Receive end of event_notify and call get_and_clear_pending_events() on
/// ChannelManager and ChannelMonitor objects.
pub fn setup_outbound(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor>>, event_notify: mpsc::Sender<()>, their_node_id: PublicKey, stream: TcpStream) {
pub fn setup_outbound<CMH: ChannelMessageHandler + 'static>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor<CMH>, Arc<CMH>>>, event_notify: mpsc::Sender<()>, their_node_id: PublicKey, stream: TcpStream) {
let (reader, us) = Self::new(event_notify, stream);
if let Ok(initial_send) = peer_manager.new_outbound_connection(their_node_id, SocketDescriptor::new(us.clone(), peer_manager.clone())) {
@ -142,7 +143,7 @@ impl Connection {
///
/// You should poll the Receive end of event_notify and call get_and_clear_pending_events() on
/// ChannelManager and ChannelMonitor objects.
pub fn connect_outbound(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor>>, event_notify: mpsc::Sender<()>, their_node_id: PublicKey, addr: SocketAddr) {
pub fn connect_outbound<CMH: ChannelMessageHandler + 'static>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor<CMH>, Arc<CMH>>>, event_notify: mpsc::Sender<()>, their_node_id: PublicKey, addr: SocketAddr) {
let connect_timeout = Delay::new(Instant::now() + Duration::from_secs(10)).then(|_| {
future::err(std::io::Error::new(std::io::ErrorKind::TimedOut, "timeout reached"))
});
@ -157,19 +158,18 @@ impl Connection {
}
}
#[derive(Clone)]
pub struct SocketDescriptor {
pub struct SocketDescriptor<CMH: ChannelMessageHandler + 'static> {
conn: Arc<Mutex<Connection>>,
id: u64,
peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor>>,
peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor<CMH>, Arc<CMH>>>,
}
impl SocketDescriptor {
fn new(conn: Arc<Mutex<Connection>>, peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor>>) -> Self {
impl<CMH: ChannelMessageHandler> SocketDescriptor<CMH> {
fn new(conn: Arc<Mutex<Connection>>, peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor<CMH>, Arc<CMH>>>) -> Self {
let id = conn.lock().unwrap().id;
Self { conn, id, peer_manager }
}
}
impl peer_handler::SocketDescriptor for SocketDescriptor {
impl<CMH: ChannelMessageHandler> peer_handler::SocketDescriptor for SocketDescriptor<CMH> {
fn send_data(&mut self, data: &[u8], resume_read: bool) -> usize {
macro_rules! schedule_read {
($us_ref: expr) => {
@ -256,13 +256,22 @@ impl peer_handler::SocketDescriptor for SocketDescriptor {
us.read_paused = true;
}
}
impl Eq for SocketDescriptor {}
impl PartialEq for SocketDescriptor {
impl<CMH: ChannelMessageHandler> Clone for SocketDescriptor<CMH> {
fn clone(&self) -> Self {
Self {
conn: Arc::clone(&self.conn),
id: self.id,
peer_manager: Arc::clone(&self.peer_manager),
}
}
}
impl<CMH: ChannelMessageHandler> Eq for SocketDescriptor<CMH> {}
impl<CMH: ChannelMessageHandler> PartialEq for SocketDescriptor<CMH> {
fn eq(&self, o: &Self) -> bool {
self.id == o.id
}
}
impl Hash for SocketDescriptor {
impl<CMH: ChannelMessageHandler> Hash for SocketDescriptor<CMH> {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
self.id.hash(state);
}

View File

@ -14,9 +14,11 @@ use bitcoin::network::constants::Network;
use util::logger::Logger;
use std::sync::{Mutex,Weak,MutexGuard,Arc};
use std::sync::{Mutex, MutexGuard, Arc};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::collections::HashSet;
use std::ops::Deref;
use std::marker::PhantomData;
/// Used to give chain error details upstream
pub enum ChainError {
@ -205,26 +207,48 @@ impl ChainWatchedUtil {
}
}
/// BlockNotifierArc is useful when you need a BlockNotifier that points to ChainListeners with
/// static lifetimes, e.g. when you're using lightning-net-tokio (since tokio::spawn requires
/// parameters with static lifetimes). Other times you can afford a reference, which is more
/// efficient, in which case BlockNotifierRef is a more appropriate type. Defining these type
/// aliases prevents issues such as overly long function definitions.
pub type BlockNotifierArc = Arc<BlockNotifier<'static, Arc<ChainListener>>>;
/// BlockNotifierRef is useful when you want a BlockNotifier that points to ChainListeners
/// with nonstatic lifetimes. This is useful for when static lifetimes are not needed. Nonstatic
/// lifetimes are more efficient but less flexible, and should be used by default unless static
/// lifetimes are required, e.g. when you're using lightning-net-tokio (since tokio::spawn
/// requires parameters with static lifetimes), in which case BlockNotifierArc is a more
/// appropriate type. Defining these type aliases for common usages prevents issues such as
/// overly long function definitions.
pub type BlockNotifierRef<'a> = BlockNotifier<'a, &'a ChainListener>;
/// Utility for notifying listeners about new blocks, and handling block rescans if new watch
/// data is registered.
pub struct BlockNotifier {
listeners: Mutex<Vec<Weak<ChainListener>>>, //TODO(vmw): try removing Weak
///
/// Rather than using a plain BlockNotifier, it is preferable to use either a BlockNotifierArc
/// or a BlockNotifierRef for conciseness. See their documentation for more details, but essentially
/// you should default to using a BlockNotifierRef, and use a BlockNotifierArc instead when you
/// require ChainListeners with static lifetimes, such as when you're using lightning-net-tokio.
pub struct BlockNotifier<'a, CL: Deref<Target = ChainListener + 'a> + 'a> {
listeners: Mutex<Vec<CL>>,
chain_monitor: Arc<ChainWatchInterface>,
phantom: PhantomData<&'a ()>,
}
impl BlockNotifier {
impl<'a, CL: Deref<Target = ChainListener + 'a> + 'a> BlockNotifier<'a, CL> {
/// Constructs a new BlockNotifier without any listeners.
pub fn new(chain_monitor: Arc<ChainWatchInterface>) -> BlockNotifier {
pub fn new(chain_monitor: Arc<ChainWatchInterface>) -> BlockNotifier<'a, CL> {
BlockNotifier {
listeners: Mutex::new(Vec::new()),
chain_monitor,
phantom: PhantomData,
}
}
/// Register the given listener to receive events. Only a weak pointer is provided and
/// the registration should be freed once that pointer expires.
/// Register the given listener to receive events.
// TODO: unregister
pub fn register_listener(&self, listener: Weak<ChainListener>) {
pub fn register_listener(&self, listener: CL) {
let mut vec = self.listeners.lock().unwrap();
vec.push(listener);
}
@ -250,12 +274,9 @@ impl BlockNotifier {
pub fn block_connected_checked(&self, header: &BlockHeader, height: u32, txn_matched: &[&Transaction], indexes_of_txn_matched: &[u32]) -> bool {
let last_seen = self.chain_monitor.reentered();
let listeners = self.listeners.lock().unwrap().clone();
let listeners = self.listeners.lock().unwrap();
for listener in listeners.iter() {
match listener.upgrade() {
Some(arc) => arc.block_connected(header, height, txn_matched, indexes_of_txn_matched),
None => ()
}
listener.block_connected(header, height, txn_matched, indexes_of_txn_matched);
}
return last_seen != self.chain_monitor.reentered();
}
@ -263,12 +284,9 @@ impl BlockNotifier {
/// Notify listeners that a block was disconnected.
pub fn block_disconnected(&self, header: &BlockHeader, disconnected_height: u32) {
let listeners = self.listeners.lock().unwrap().clone();
let listeners = self.listeners.lock().unwrap();
for listener in listeners.iter() {
match listener.upgrade() {
Some(arc) => arc.block_disconnected(&header, disconnected_height),
None => ()
}
listener.block_disconnected(&header, disconnected_height);
}
}

View File

@ -19,11 +19,13 @@ use ln::functional_test_utils::*;
#[test]
fn test_simple_monitor_permanent_update_fail() {
// Test that we handle a simple permanent monitor update failure
let mut nodes = create_network(2, &[None, None]);
let node_cfgs = create_node_cfgs(2);
let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
let mut nodes = create_network(2, &node_cfgs, &node_chanmgrs);
create_announced_chan_between_nodes(&nodes, 0, 1, InitFeatures::supported(), InitFeatures::supported());
let route = nodes[0].router.get_route(&nodes[1].node.get_our_node_id(), None, &Vec::new(), 1000000, TEST_FINAL_CLTV).unwrap();
let (_, payment_hash_1) = get_payment_preimage_hash!(nodes[0]);
let (_, payment_hash_1) = get_payment_preimage_hash!(&nodes[0]);
*nodes[0].chan_monitor.update_ret.lock().unwrap() = Err(ChannelMonitorUpdateErr::PermanentFailure);
if let Err(APIError::ChannelUnavailable {..}) = nodes[0].node.send_payment(route, payment_hash_1) {} else { panic!(); }
@ -49,11 +51,13 @@ fn test_simple_monitor_permanent_update_fail() {
fn do_test_simple_monitor_temporary_update_fail(disconnect: bool) {
// Test that we can recover from a simple temporary monitor update failure optionally with
// a disconnect in between
let mut nodes = create_network(2, &[None, None]);
let node_cfgs = create_node_cfgs(2);
let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
let mut nodes = create_network(2, &node_cfgs, &node_chanmgrs);
create_announced_chan_between_nodes(&nodes, 0, 1, InitFeatures::supported(), InitFeatures::supported());
let route = nodes[0].router.get_route(&nodes[1].node.get_our_node_id(), None, &Vec::new(), 1000000, TEST_FINAL_CLTV).unwrap();
let (payment_preimage_1, payment_hash_1) = get_payment_preimage_hash!(nodes[0]);
let (payment_preimage_1, payment_hash_1) = get_payment_preimage_hash!(&nodes[0]);
*nodes[0].chan_monitor.update_ret.lock().unwrap() = Err(ChannelMonitorUpdateErr::TemporaryFailure);
if let Err(APIError::MonitorUpdateFailed) = nodes[0].node.send_payment(route.clone(), payment_hash_1) {} else { panic!(); }
@ -95,7 +99,7 @@ fn do_test_simple_monitor_temporary_update_fail(disconnect: bool) {
claim_payment(&nodes[0], &[&nodes[1]], payment_preimage_1, 1_000_000);
// Now set it to failed again...
let (_, payment_hash_2) = get_payment_preimage_hash!(nodes[0]);
let (_, payment_hash_2) = get_payment_preimage_hash!(&nodes[0]);
*nodes[0].chan_monitor.update_ret.lock().unwrap() = Err(ChannelMonitorUpdateErr::TemporaryFailure);
if let Err(APIError::MonitorUpdateFailed) = nodes[0].node.send_payment(route, payment_hash_2) {} else { panic!(); }
check_added_monitors!(nodes[0], 1);
@ -148,7 +152,9 @@ fn do_test_monitor_temporary_update_fail(disconnect_count: usize) {
// * We then walk through more message exchanges to get the original update_add_htlc
// through, swapping message ordering based on disconnect_count & 8 and optionally
// disconnect/reconnecting based on disconnect_count.
let mut nodes = create_network(2, &[None, None]);
let node_cfgs = create_node_cfgs(2);
let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
let mut nodes = create_network(2, &node_cfgs, &node_chanmgrs);
create_announced_chan_between_nodes(&nodes, 0, 1, InitFeatures::supported(), InitFeatures::supported());
let (payment_preimage_1, _) = route_payment(&nodes[0], &[&nodes[1]], 1000000);
@ -474,7 +480,9 @@ fn test_monitor_temporary_update_fail_c() {
#[test]
fn test_monitor_update_fail_cs() {
// Tests handling of a monitor update failure when processing an incoming commitment_signed
let mut nodes = create_network(2, &[None, None]);
let node_cfgs = create_node_cfgs(2);
let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
let mut nodes = create_network(2, &node_cfgs, &node_chanmgrs);
create_announced_chan_between_nodes(&nodes, 0, 1, InitFeatures::supported(), InitFeatures::supported());
let route = nodes[0].router.get_route(&nodes[1].node.get_our_node_id(), None, &Vec::new(), 1000000, TEST_FINAL_CLTV).unwrap();
@ -553,7 +561,9 @@ fn test_monitor_update_fail_no_rebroadcast() {
// Tests handling of a monitor update failure when no message rebroadcasting on
// test_restore_channel_monitor() is required. Backported from
// chanmon_fail_consistency fuzz tests.
let mut nodes = create_network(2, &[None, None]);
let node_cfgs = create_node_cfgs(2);
let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
let mut nodes = create_network(2, &node_cfgs, &node_chanmgrs);
create_announced_chan_between_nodes(&nodes, 0, 1, InitFeatures::supported(), InitFeatures::supported());
let route = nodes[0].router.get_route(&nodes[1].node.get_our_node_id(), None, &Vec::new(), 1000000, TEST_FINAL_CLTV).unwrap();
@ -595,7 +605,9 @@ fn test_monitor_update_fail_no_rebroadcast() {
fn test_monitor_update_raa_while_paused() {
// Tests handling of an RAA while monitor updating has already been marked failed.
// Backported from chanmon_fail_consistency fuzz tests as this used to be broken.
let mut nodes = create_network(2, &[None, None]);
let node_cfgs = create_node_cfgs(2);
let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
let mut nodes = create_network(2, &node_cfgs, &node_chanmgrs);
create_announced_chan_between_nodes(&nodes, 0, 1, InitFeatures::supported(), InitFeatures::supported());
send_payment(&nodes[0], &[&nodes[1]], 5000000, 5_000_000);
@ -662,7 +674,9 @@ fn test_monitor_update_raa_while_paused() {
fn do_test_monitor_update_fail_raa(test_ignore_second_cs: bool) {
// Tests handling of a monitor update failure when processing an incoming RAA
let mut nodes = create_network(3, &[None, None, None]);
let node_cfgs = create_node_cfgs(3);
let node_chanmgrs = create_node_chanmgrs(3, &node_cfgs, &[None, None, None]);
let mut nodes = create_network(3, &node_cfgs, &node_chanmgrs);
create_announced_chan_between_nodes(&nodes, 0, 1, InitFeatures::supported(), InitFeatures::supported());
let chan_2 = create_announced_chan_between_nodes(&nodes, 1, 2, InitFeatures::supported(), InitFeatures::supported());
@ -915,7 +929,9 @@ fn test_monitor_update_fail_reestablish() {
// Simple test for message retransmission after monitor update failure on
// channel_reestablish generating a monitor update (which comes from freeing holding cell
// HTLCs).
let mut nodes = create_network(3, &[None, None, None]);
let node_cfgs = create_node_cfgs(3);
let node_chanmgrs = create_node_chanmgrs(3, &node_cfgs, &[None, None, None]);
let mut nodes = create_network(3, &node_cfgs, &node_chanmgrs);
create_announced_chan_between_nodes(&nodes, 0, 1, InitFeatures::supported(), InitFeatures::supported());
create_announced_chan_between_nodes(&nodes, 1, 2, InitFeatures::supported(), InitFeatures::supported());
@ -993,7 +1009,9 @@ fn raa_no_response_awaiting_raa_state() {
// due to a previous monitor update failure, we still set AwaitingRemoteRevoke on the channel
// in question (assuming it intends to respond with a CS after monitor updating is restored).
// Backported from chanmon_fail_consistency fuzz tests as this used to be broken.
let mut nodes = create_network(2, &[None, None]);
let node_cfgs = create_node_cfgs(2);
let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
let mut nodes = create_network(2, &node_cfgs, &node_chanmgrs);
create_announced_chan_between_nodes(&nodes, 0, 1, InitFeatures::supported(), InitFeatures::supported());
let route = nodes[0].router.get_route(&nodes[1].node.get_our_node_id(), None, &Vec::new(), 1000000, TEST_FINAL_CLTV).unwrap();
@ -1106,7 +1124,9 @@ fn claim_while_disconnected_monitor_update_fail() {
// Backported from chanmon_fail_consistency fuzz tests as an unmerged version of the handling
// code introduced a regression in this test (specifically, this caught a removal of the
// channel_reestablish handling ensuring the order was sensical given the messages used).
let mut nodes = create_network(2, &[None, None]);
let node_cfgs = create_node_cfgs(2);
let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
let mut nodes = create_network(2, &node_cfgs, &node_chanmgrs);
create_announced_chan_between_nodes(&nodes, 0, 1, InitFeatures::supported(), InitFeatures::supported());
// Forward a payment for B to claim
@ -1221,7 +1241,9 @@ fn monitor_failed_no_reestablish_response() {
// response to a commitment_signed.
// Backported from chanmon_fail_consistency fuzz tests as it caught a long-standing
// debug_assert!() failure in channel_reestablish handling.
let mut nodes = create_network(2, &[None, None]);
let node_cfgs = create_node_cfgs(2);
let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
let mut nodes = create_network(2, &node_cfgs, &node_chanmgrs);
create_announced_chan_between_nodes(&nodes, 0, 1, InitFeatures::supported(), InitFeatures::supported());
// Route the payment and deliver the initial commitment_signed (with a monitor update failure
@ -1287,7 +1309,9 @@ fn first_message_on_recv_ordering() {
// have no pending response but will want to send a RAA/CS (with the updates for the second
// payment applied).
// Backported from chanmon_fail_consistency fuzz tests as it caught a bug here.
let mut nodes = create_network(2, &[None, None]);
let node_cfgs = create_node_cfgs(2);
let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
let mut nodes = create_network(2, &node_cfgs, &node_chanmgrs);
create_announced_chan_between_nodes(&nodes, 0, 1, InitFeatures::supported(), InitFeatures::supported());
// Route the first payment outbound, holding the last RAA for B until we are set up so that we
@ -1372,7 +1396,9 @@ fn test_monitor_update_fail_claim() {
// update to claim the payment. We then send a payment C->B->A, making the forward of this
// payment from B to A fail due to the paused channel. Finally, we restore the channel monitor
// updating and claim the payment on B.
let mut nodes = create_network(3, &[None, None, None]);
let node_cfgs = create_node_cfgs(3);
let node_chanmgrs = create_node_chanmgrs(3, &node_cfgs, &[None, None, None]);
let mut nodes = create_network(3, &node_cfgs, &node_chanmgrs);
let chan_1 = create_announced_chan_between_nodes(&nodes, 0, 1, InitFeatures::supported(), InitFeatures::supported());
create_announced_chan_between_nodes(&nodes, 1, 2, InitFeatures::supported(), InitFeatures::supported());
@ -1445,7 +1471,9 @@ fn test_monitor_update_on_pending_forwards() {
// We do this with a simple 3-node network, sending a payment from A to C and one from C to A.
// The payment from A to C will be failed by C and pending a back-fail to A, while the payment
// from C to A will be pending a forward to A.
let mut nodes = create_network(3, &[None, None, None]);
let node_cfgs = create_node_cfgs(3);
let node_chanmgrs = create_node_chanmgrs(3, &node_cfgs, &[None, None, None]);
let mut nodes = create_network(3, &node_cfgs, &node_chanmgrs);
create_announced_chan_between_nodes(&nodes, 0, 1, InitFeatures::supported(), InitFeatures::supported());
create_announced_chan_between_nodes(&nodes, 1, 2, InitFeatures::supported(), InitFeatures::supported());
@ -1510,7 +1538,9 @@ fn monitor_update_claim_fail_no_response() {
// to channel being AwaitingRAA).
// Backported from chanmon_fail_consistency fuzz tests as an unmerged version of the handling
// code was broken.
let mut nodes = create_network(2, &[None, None]);
let node_cfgs = create_node_cfgs(2);
let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
let mut nodes = create_network(2, &node_cfgs, &node_chanmgrs);
create_announced_chan_between_nodes(&nodes, 0, 1, InitFeatures::supported(), InitFeatures::supported());
// Forward a payment for B to claim
@ -1569,7 +1599,9 @@ fn monitor_update_claim_fail_no_response() {
fn do_during_funding_monitor_fail(fail_on_generate: bool, restore_between_fails: bool, fail_on_signed: bool, confirm_a_first: bool, restore_b_before_conf: bool) {
// Test that if the monitor update generated by funding_transaction_generated fails we continue
// the channel setup happily after the update is restored.
let mut nodes = create_network(2, &[None, None]);
let node_cfgs = create_node_cfgs(2);
let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
let mut nodes = create_network(2, &node_cfgs, &node_chanmgrs);
nodes[0].node.create_channel(nodes[1].node.get_our_node_id(), 100000, 10001, 43).unwrap();
nodes[1].node.handle_open_channel(&nodes[0].node.get_our_node_id(), InitFeatures::supported(), &get_event_msg!(nodes[0], MessageSendEvent::SendOpenChannel, nodes[1].node.get_our_node_id()));

View File

@ -34,7 +34,7 @@ use ln::features::InitFeatures;
use ln::msgs;
use ln::onion_utils;
use ln::msgs::{ChannelMessageHandler, DecodeError, LightningError};
use chain::keysinterface::{ChannelKeys, KeysInterface};
use chain::keysinterface::{ChannelKeys, KeysInterface, InMemoryChannelKeys};
use util::config::UserConfig;
use util::{byte_utils, events};
use util::ser::{Readable, ReadableArgs, Writeable, Writer};
@ -48,6 +48,8 @@ use std::io::Cursor;
use std::sync::{Arc, Mutex, MutexGuard, RwLock};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::Duration;
use std::marker::{Sync, Send};
use std::ops::Deref;
const SIXTY_FIVE_ZEROS: [u8; 65] = [0; 65];
@ -284,6 +286,21 @@ struct PeerState {
#[cfg(not(any(target_pointer_width = "32", target_pointer_width = "64")))]
const ERR: () = "You need at least 32 bit pointers (well, usize, but we'll assume they're the same) for ChannelManager::latest_block_height";
/// SimpleArcChannelManager is useful when you need a ChannelManager with a static lifetime, e.g.
/// when you're using lightning-net-tokio (since tokio::spawn requires parameters with static
/// lifetimes). Other times you can afford a reference, which is more efficient, in which case
/// SimpleRefChannelManager is the more appropriate type. Defining these type aliases prevents
/// issues such as overly long function definitions.
pub type SimpleArcChannelManager<M> = Arc<ChannelManager<InMemoryChannelKeys, Arc<M>>>;
/// SimpleRefChannelManager is a type alias for a ChannelManager reference, and is the reference
/// counterpart to the SimpleArcChannelManager type alias. Use this type by default when you don't
/// need a ChannelManager with a static lifetime. You'll need a static lifetime in cases such as
/// usage of lightning-net-tokio (since tokio::spawn requires parameters with static lifetimes).
/// But if this is not necessary, using a reference is more efficient. Defining these type aliases
/// helps with issues such as long function definitions.
pub type SimpleRefChannelManager<'a, M> = ChannelManager<InMemoryChannelKeys, &'a M>;
/// Manager which keeps track of a number of channels and sends messages to the appropriate
/// channel, also tracking HTLC preimages and forwarding onion packets appropriately.
///
@ -313,12 +330,18 @@ const ERR: () = "You need at least 32 bit pointers (well, usize, but we'll assum
/// ChannelUpdate messages informing peers that the channel is temporarily disabled. To avoid
/// spam due to quick disconnection/reconnection, updates are not sent until the channel has been
/// offline for a full minute. In order to track this, you must call
/// timer_chan_freshness_every_min roughly once per minute, though it doesn't have to be perfec.
pub struct ChannelManager<ChanSigner: ChannelKeys> {
/// timer_chan_freshness_every_min roughly once per minute, though it doesn't have to be perfect.
///
/// Rather than using a plain ChannelManager, it is preferable to use either a SimpleArcChannelManager
/// a SimpleRefChannelManager, for conciseness. See their documentation for more details, but
/// essentially you should default to using a SimpleRefChannelManager, and use a
/// SimpleArcChannelManager when you require a ChannelManager with a static lifetime, such as when
/// you're using lightning-net-tokio.
pub struct ChannelManager<ChanSigner: ChannelKeys, M: Deref> where M::Target: ManyChannelMonitor {
default_configuration: UserConfig,
genesis_hash: Sha256dHash,
fee_estimator: Arc<FeeEstimator>,
monitor: Arc<ManyChannelMonitor>,
monitor: M,
tx_broadcaster: Arc<BroadcasterInterface>,
#[cfg(test)]
@ -586,7 +609,7 @@ macro_rules! maybe_break_monitor_err {
}
}
impl<ChanSigner: ChannelKeys> ChannelManager<ChanSigner> {
impl<ChanSigner: ChannelKeys, M: Deref> ChannelManager<ChanSigner, M> where M::Target: ManyChannelMonitor {
/// Constructs a new ChannelManager to hold several channels and route between them.
///
/// This is the main "logic hub" for all channel-related actions, and implements
@ -605,14 +628,14 @@ impl<ChanSigner: ChannelKeys> ChannelManager<ChanSigner> {
/// the ChannelManager as a listener to the BlockNotifier and call the BlockNotifier's
/// `block_(dis)connected` methods, which will notify all registered listeners in one
/// go.
pub fn new(network: Network, feeest: Arc<FeeEstimator>, monitor: Arc<ManyChannelMonitor>, tx_broadcaster: Arc<BroadcasterInterface>, logger: Arc<Logger>,keys_manager: Arc<KeysInterface<ChanKeySigner = ChanSigner>>, config: UserConfig, current_blockchain_height: usize) -> Result<Arc<ChannelManager<ChanSigner>>, secp256k1::Error> {
pub fn new(network: Network, feeest: Arc<FeeEstimator>, monitor: M, tx_broadcaster: Arc<BroadcasterInterface>, logger: Arc<Logger>,keys_manager: Arc<KeysInterface<ChanKeySigner = ChanSigner>>, config: UserConfig, current_blockchain_height: usize) -> Result<ChannelManager<ChanSigner, M>, secp256k1::Error> {
let secp_ctx = Secp256k1::new();
let res = Arc::new(ChannelManager {
let res = ChannelManager {
default_configuration: config.clone(),
genesis_hash: genesis_block(network).header.bitcoin_hash(),
fee_estimator: feeest.clone(),
monitor: monitor.clone(),
monitor,
tx_broadcaster,
latest_block_height: AtomicUsize::new(current_blockchain_height),
@ -636,7 +659,7 @@ impl<ChanSigner: ChannelKeys> ChannelManager<ChanSigner> {
keys_manager,
logger,
});
};
Ok(res)
}
@ -2484,7 +2507,7 @@ impl<ChanSigner: ChannelKeys> ChannelManager<ChanSigner> {
}
}
impl<ChanSigner: ChannelKeys> events::MessageSendEventsProvider for ChannelManager<ChanSigner> {
impl<ChanSigner: ChannelKeys, M: Deref> events::MessageSendEventsProvider for ChannelManager<ChanSigner, M> where M::Target: ManyChannelMonitor {
fn get_and_clear_pending_msg_events(&self) -> Vec<events::MessageSendEvent> {
// TODO: Event release to users and serialization is currently race-y: it's very easy for a
// user to serialize a ChannelManager with pending events in it and lose those events on
@ -2509,7 +2532,7 @@ impl<ChanSigner: ChannelKeys> events::MessageSendEventsProvider for ChannelManag
}
}
impl<ChanSigner: ChannelKeys> events::EventsProvider for ChannelManager<ChanSigner> {
impl<ChanSigner: ChannelKeys, M: Deref> events::EventsProvider for ChannelManager<ChanSigner, M> where M::Target: ManyChannelMonitor {
fn get_and_clear_pending_events(&self) -> Vec<events::Event> {
// TODO: Event release to users and serialization is currently race-y: it's very easy for a
// user to serialize a ChannelManager with pending events in it and lose those events on
@ -2534,7 +2557,7 @@ impl<ChanSigner: ChannelKeys> events::EventsProvider for ChannelManager<ChanSign
}
}
impl<ChanSigner: ChannelKeys> ChainListener for ChannelManager<ChanSigner> {
impl<ChanSigner: ChannelKeys, M: Deref + Sync + Send> ChainListener for ChannelManager<ChanSigner, M> where M::Target: ManyChannelMonitor {
fn block_connected(&self, header: &BlockHeader, height: u32, txn_matched: &[&Transaction], indexes_of_txn_matched: &[u32]) {
let header_hash = header.bitcoin_hash();
log_trace!(self, "Block {} at height {} connected with {} txn matched", header_hash, height, txn_matched.len());
@ -2648,7 +2671,7 @@ impl<ChanSigner: ChannelKeys> ChainListener for ChannelManager<ChanSigner> {
}
}
impl<ChanSigner: ChannelKeys> ChannelMessageHandler for ChannelManager<ChanSigner> {
impl<ChanSigner: ChannelKeys, M: Deref + Sync + Send> ChannelMessageHandler for ChannelManager<ChanSigner, M> where M::Target: ManyChannelMonitor {
fn handle_open_channel(&self, their_node_id: &PublicKey, their_features: InitFeatures, msg: &msgs::OpenChannel) {
let _ = self.total_consistency_lock.read().unwrap();
let res = self.internal_open_channel(their_node_id, their_features, msg);
@ -3118,7 +3141,7 @@ impl<R: ::std::io::Read> Readable<R> for HTLCForwardInfo {
}
}
impl<ChanSigner: ChannelKeys + Writeable> Writeable for ChannelManager<ChanSigner> {
impl<ChanSigner: ChannelKeys + Writeable, M: Deref> Writeable for ChannelManager<ChanSigner, M> where M::Target: ManyChannelMonitor {
fn write<W: Writer>(&self, writer: &mut W) -> Result<(), ::std::io::Error> {
let _ = self.total_consistency_lock.write().unwrap();
@ -3189,7 +3212,7 @@ impl<ChanSigner: ChannelKeys + Writeable> Writeable for ChannelManager<ChanSigne
/// 5) Move the ChannelMonitors into your local ManyChannelMonitor.
/// 6) Disconnect/connect blocks on the ChannelManager.
/// 7) Register the new ChannelManager with your ChainWatchInterface.
pub struct ChannelManagerReadArgs<'a, ChanSigner: ChannelKeys> {
pub struct ChannelManagerReadArgs<'a, ChanSigner: ChannelKeys, M: Deref> where M::Target: ManyChannelMonitor {
/// The keys provider which will give us relevant keys. Some keys will be loaded during
/// deserialization.
pub keys_manager: Arc<KeysInterface<ChanKeySigner = ChanSigner>>,
@ -3203,7 +3226,7 @@ pub struct ChannelManagerReadArgs<'a, ChanSigner: ChannelKeys> {
/// No calls to the ManyChannelMonitor will be made during deserialization. It is assumed that
/// you have deserialized ChannelMonitors separately and will add them to your
/// ManyChannelMonitor after deserializing this ChannelManager.
pub monitor: Arc<ManyChannelMonitor>,
pub monitor: M,
/// The BroadcasterInterface which will be used in the ChannelManager in the future and may be
/// used to broadcast the latest local commitment transactions of channels which must be
@ -3229,8 +3252,8 @@ pub struct ChannelManagerReadArgs<'a, ChanSigner: ChannelKeys> {
pub channel_monitors: &'a mut HashMap<OutPoint, &'a mut ChannelMonitor>,
}
impl<'a, R : ::std::io::Read, ChanSigner: ChannelKeys + Readable<R>> ReadableArgs<R, ChannelManagerReadArgs<'a, ChanSigner>> for (Sha256dHash, ChannelManager<ChanSigner>) {
fn read(reader: &mut R, args: ChannelManagerReadArgs<'a, ChanSigner>) -> Result<Self, DecodeError> {
impl<'a, R : ::std::io::Read, ChanSigner: ChannelKeys + Readable<R>, M: Deref> ReadableArgs<R, ChannelManagerReadArgs<'a, ChanSigner, M>> for (Sha256dHash, ChannelManager<ChanSigner, M>) where M::Target: ManyChannelMonitor {
fn read(reader: &mut R, args: ChannelManagerReadArgs<'a, ChanSigner, M>) -> Result<Self, DecodeError> {
let _ver: u8 = Readable::read(reader)?;
let min_ver: u8 = Readable::read(reader)?;
if min_ver > SERIALIZATION_VERSION {

View File

@ -152,7 +152,6 @@ pub struct SimpleManyChannelMonitor<Key> {
}
impl<'a, Key : Send + cmp::Eq + hash::Hash> ChainListener for SimpleManyChannelMonitor<Key> {
fn block_connected(&self, header: &BlockHeader, height: u32, txn_matched: &[&Transaction], _indexes_of_txn_matched: &[u32]) {
let block_hash = header.bitcoin_hash();
let mut new_events: Vec<events::Event> = Vec::with_capacity(0);
@ -219,8 +218,8 @@ impl<'a, Key : Send + cmp::Eq + hash::Hash> ChainListener for SimpleManyChannelM
impl<Key : Send + cmp::Eq + hash::Hash + 'static> SimpleManyChannelMonitor<Key> {
/// Creates a new object which can be used to monitor several channels given the chain
/// interface with which to register to receive notifications.
pub fn new(chain_monitor: Arc<ChainWatchInterface>, broadcaster: Arc<BroadcasterInterface>, logger: Arc<Logger>, feeest: Arc<FeeEstimator>) -> Arc<SimpleManyChannelMonitor<Key>> {
let res = Arc::new(SimpleManyChannelMonitor {
pub fn new(chain_monitor: Arc<ChainWatchInterface>, broadcaster: Arc<BroadcasterInterface>, logger: Arc<Logger>, feeest: Arc<FeeEstimator>) -> SimpleManyChannelMonitor<Key> {
let res = SimpleManyChannelMonitor {
monitors: Mutex::new(HashMap::new()),
chain_monitor,
broadcaster,
@ -228,7 +227,7 @@ impl<Key : Send + cmp::Eq + hash::Hash + 'static> SimpleManyChannelMonitor<Key>
pending_htlc_updated: Mutex::new(HashMap::new()),
logger,
fee_estimator: feeest,
});
};
res
}

View File

@ -11,6 +11,7 @@ use ln::msgs;
use ln::msgs::{ChannelMessageHandler,RoutingMessageHandler};
use util::enforcing_trait_impls::EnforcingChannelKeys;
use util::test_utils;
use util::test_utils::TestChannelMonitor;
use util::events::{Event, EventsProvider, MessageSendEvent, MessageSendEventsProvider};
use util::errors::APIError;
use util::logger::Logger;
@ -36,7 +37,7 @@ use std::sync::{Arc, Mutex};
use std::mem;
pub const CHAN_CONFIRM_DEPTH: u32 = 100;
pub fn confirm_transaction(notifier: &chaininterface::BlockNotifier, chain: &chaininterface::ChainWatchInterfaceUtil, tx: &Transaction, chan_id: u32) {
pub fn confirm_transaction<'a, 'b: 'a>(notifier: &'a chaininterface::BlockNotifierRef<'b>, chain: &chaininterface::ChainWatchInterfaceUtil, tx: &Transaction, chan_id: u32) {
assert!(chain.does_match_tx(tx));
let mut header = BlockHeader { version: 0x20000000, prev_blockhash: Default::default(), merkle_root: Default::default(), time: 42, bits: 42, nonce: 42 };
notifier.block_connected_checked(&header, 1, &[tx; 1], &[chan_id; 1]);
@ -46,7 +47,7 @@ pub fn confirm_transaction(notifier: &chaininterface::BlockNotifier, chain: &cha
}
}
pub fn connect_blocks(notifier: &chaininterface::BlockNotifier, depth: u32, height: u32, parent: bool, prev_blockhash: Sha256d) -> Sha256d {
pub fn connect_blocks<'a, 'b>(notifier: &'a chaininterface::BlockNotifierRef<'b>, depth: u32, height: u32, parent: bool, prev_blockhash: Sha256d) -> Sha256d {
let mut header = BlockHeader { version: 0x2000000, prev_blockhash: if parent { prev_blockhash } else { Default::default() }, merkle_root: Default::default(), time: 42, bits: 42, nonce: 42 };
notifier.block_connected_checked(&header, height + 1, &Vec::new(), &Vec::new());
for i in 2..depth + 1 {
@ -56,20 +57,31 @@ pub fn connect_blocks(notifier: &chaininterface::BlockNotifier, depth: u32, heig
header.bitcoin_hash()
}
pub struct Node {
pub block_notifier: Arc<chaininterface::BlockNotifier>,
pub struct NodeCfg {
pub chain_monitor: Arc<chaininterface::ChainWatchInterfaceUtil>,
pub tx_broadcaster: Arc<test_utils::TestBroadcaster>,
pub chan_monitor: Arc<test_utils::TestChannelMonitor>,
pub fee_estimator: Arc<test_utils::TestFeeEstimator>,
pub chan_monitor: test_utils::TestChannelMonitor,
pub keys_manager: Arc<test_utils::TestKeysInterface>,
pub node: Arc<ChannelManager<EnforcingChannelKeys>>,
pub logger: Arc<test_utils::TestLogger>,
pub node_seed: [u8; 32],
}
pub struct Node<'a, 'b: 'a> {
pub block_notifier: chaininterface::BlockNotifierRef<'b>,
pub chain_monitor: Arc<chaininterface::ChainWatchInterfaceUtil>,
pub tx_broadcaster: Arc<test_utils::TestBroadcaster>,
pub chan_monitor: &'b test_utils::TestChannelMonitor,
pub keys_manager: Arc<test_utils::TestKeysInterface>,
pub node: &'a ChannelManager<EnforcingChannelKeys, &'b TestChannelMonitor>,
pub router: Router,
pub node_seed: [u8; 32],
pub network_payment_count: Rc<RefCell<u8>>,
pub network_chan_count: Rc<RefCell<u32>>,
pub logger: Arc<test_utils::TestLogger>
}
impl Drop for Node {
impl<'a, 'b> Drop for Node<'a, 'b> {
fn drop(&mut self) {
if !::std::thread::panicking() {
// Check that we processed all pending events
@ -80,11 +92,11 @@ impl Drop for Node {
}
}
pub fn create_chan_between_nodes(node_a: &Node, node_b: &Node, a_flags: InitFeatures, b_flags: InitFeatures) -> (msgs::ChannelAnnouncement, msgs::ChannelUpdate, msgs::ChannelUpdate, [u8; 32], Transaction) {
pub fn create_chan_between_nodes<'a, 'b, 'c>(node_a: &'a Node<'b, 'c>, node_b: &'a Node<'b, 'c>, a_flags: InitFeatures, b_flags: InitFeatures) -> (msgs::ChannelAnnouncement, msgs::ChannelUpdate, msgs::ChannelUpdate, [u8; 32], Transaction) {
create_chan_between_nodes_with_value(node_a, node_b, 100000, 10001, a_flags, b_flags)
}
pub fn create_chan_between_nodes_with_value(node_a: &Node, node_b: &Node, channel_value: u64, push_msat: u64, a_flags: InitFeatures, b_flags: InitFeatures) -> (msgs::ChannelAnnouncement, msgs::ChannelUpdate, msgs::ChannelUpdate, [u8; 32], Transaction) {
pub fn create_chan_between_nodes_with_value<'a, 'b, 'c>(node_a: &'a Node<'b, 'c>, node_b: &'a Node<'b, 'c>, channel_value: u64, push_msat: u64, a_flags: InitFeatures, b_flags: InitFeatures) -> (msgs::ChannelAnnouncement, msgs::ChannelUpdate, msgs::ChannelUpdate, [u8; 32], Transaction) {
let (funding_locked, channel_id, tx) = create_chan_between_nodes_with_value_a(node_a, node_b, channel_value, push_msat, a_flags, b_flags);
let (announcement, as_update, bs_update) = create_chan_between_nodes_with_value_b(node_a, node_b, &funding_locked);
(announcement, as_update, bs_update, channel_id, tx)
@ -159,7 +171,7 @@ macro_rules! get_feerate {
}
}
pub fn create_funding_transaction(node: &Node, expected_chan_value: u64, expected_user_chan_id: u64) -> ([u8; 32], Transaction, OutPoint) {
pub fn create_funding_transaction<'a, 'b>(node: &Node<'a, 'b>, expected_chan_value: u64, expected_user_chan_id: u64) -> ([u8; 32], Transaction, OutPoint) {
let chan_id = *node.network_chan_count.borrow();
let events = node.node.get_and_clear_pending_events();
@ -179,7 +191,7 @@ pub fn create_funding_transaction(node: &Node, expected_chan_value: u64, expecte
}
}
pub fn create_chan_between_nodes_with_value_init(node_a: &Node, node_b: &Node, channel_value: u64, push_msat: u64, a_flags: InitFeatures, b_flags: InitFeatures) -> Transaction {
pub fn create_chan_between_nodes_with_value_init<'a, 'b>(node_a: &Node<'a, 'b>, node_b: &Node<'a, 'b>, channel_value: u64, push_msat: u64, a_flags: InitFeatures, b_flags: InitFeatures) -> Transaction {
node_a.node.create_channel(node_b.node.get_our_node_id(), channel_value, push_msat, 42).unwrap();
node_b.node.handle_open_channel(&node_a.node.get_our_node_id(), a_flags, &get_event_msg!(node_a, MessageSendEvent::SendOpenChannel, node_b.node.get_our_node_id()));
node_a.node.handle_accept_channel(&node_b.node.get_our_node_id(), b_flags, &get_event_msg!(node_b, MessageSendEvent::SendAcceptChannel, node_a.node.get_our_node_id()));
@ -223,12 +235,12 @@ pub fn create_chan_between_nodes_with_value_init(node_a: &Node, node_b: &Node, c
tx
}
pub fn create_chan_between_nodes_with_value_confirm_first(node_recv: &Node, node_conf: &Node, tx: &Transaction) {
pub fn create_chan_between_nodes_with_value_confirm_first<'a, 'b, 'c>(node_recv: &'a Node<'a, 'b>, node_conf: &'a Node<'a, 'b>, tx: &Transaction) {
confirm_transaction(&node_conf.block_notifier, &node_conf.chain_monitor, &tx, tx.version);
node_recv.node.handle_funding_locked(&node_conf.node.get_our_node_id(), &get_event_msg!(node_conf, MessageSendEvent::SendFundingLocked, node_recv.node.get_our_node_id()));
}
pub fn create_chan_between_nodes_with_value_confirm_second(node_recv: &Node, node_conf: &Node) -> ((msgs::FundingLocked, msgs::AnnouncementSignatures), [u8; 32]) {
pub fn create_chan_between_nodes_with_value_confirm_second<'a, 'b>(node_recv: &Node<'a, 'b>, node_conf: &Node<'a, 'b>) -> ((msgs::FundingLocked, msgs::AnnouncementSignatures), [u8; 32]) {
let channel_id;
let events_6 = node_conf.node.get_and_clear_pending_msg_events();
assert_eq!(events_6.len(), 2);
@ -248,19 +260,19 @@ pub fn create_chan_between_nodes_with_value_confirm_second(node_recv: &Node, nod
}), channel_id)
}
pub fn create_chan_between_nodes_with_value_confirm(node_a: &Node, node_b: &Node, tx: &Transaction) -> ((msgs::FundingLocked, msgs::AnnouncementSignatures), [u8; 32]) {
pub fn create_chan_between_nodes_with_value_confirm<'a, 'b, 'c>(node_a: &'a Node<'b, 'c>, node_b: &'a Node<'b, 'c>, tx: &Transaction) -> ((msgs::FundingLocked, msgs::AnnouncementSignatures), [u8; 32]) {
create_chan_between_nodes_with_value_confirm_first(node_a, node_b, tx);
confirm_transaction(&node_a.block_notifier, &node_a.chain_monitor, &tx, tx.version);
create_chan_between_nodes_with_value_confirm_second(node_b, node_a)
}
pub fn create_chan_between_nodes_with_value_a(node_a: &Node, node_b: &Node, channel_value: u64, push_msat: u64, a_flags: InitFeatures, b_flags: InitFeatures) -> ((msgs::FundingLocked, msgs::AnnouncementSignatures), [u8; 32], Transaction) {
pub fn create_chan_between_nodes_with_value_a<'a, 'b, 'c>(node_a: &'a Node<'b, 'c>, node_b: &'a Node<'b, 'c>, channel_value: u64, push_msat: u64, a_flags: InitFeatures, b_flags: InitFeatures) -> ((msgs::FundingLocked, msgs::AnnouncementSignatures), [u8; 32], Transaction) {
let tx = create_chan_between_nodes_with_value_init(node_a, node_b, channel_value, push_msat, a_flags, b_flags);
let (msgs, chan_id) = create_chan_between_nodes_with_value_confirm(node_a, node_b, &tx);
(msgs, chan_id, tx)
}
pub fn create_chan_between_nodes_with_value_b(node_a: &Node, node_b: &Node, as_funding_msgs: &(msgs::FundingLocked, msgs::AnnouncementSignatures)) -> (msgs::ChannelAnnouncement, msgs::ChannelUpdate, msgs::ChannelUpdate) {
pub fn create_chan_between_nodes_with_value_b<'a, 'b>(node_a: &Node<'a, 'b>, node_b: &Node<'a, 'b>, as_funding_msgs: &(msgs::FundingLocked, msgs::AnnouncementSignatures)) -> (msgs::ChannelAnnouncement, msgs::ChannelUpdate, msgs::ChannelUpdate) {
node_b.node.handle_funding_locked(&node_a.node.get_our_node_id(), &as_funding_msgs.0);
let bs_announcement_sigs = get_event_msg!(node_b, MessageSendEvent::SendAnnouncementSignatures, node_a.node.get_our_node_id());
node_b.node.handle_announcement_signatures(&node_a.node.get_our_node_id(), &as_funding_msgs.1);
@ -292,11 +304,11 @@ pub fn create_chan_between_nodes_with_value_b(node_a: &Node, node_b: &Node, as_f
((*announcement).clone(), (*as_update).clone(), (*bs_update).clone())
}
pub fn create_announced_chan_between_nodes(nodes: &Vec<Node>, a: usize, b: usize, a_flags: InitFeatures, b_flags: InitFeatures) -> (msgs::ChannelUpdate, msgs::ChannelUpdate, [u8; 32], Transaction) {
pub fn create_announced_chan_between_nodes<'a, 'b, 'c>(nodes: &'a Vec<Node<'b, 'c>>, a: usize, b: usize, a_flags: InitFeatures, b_flags: InitFeatures) -> (msgs::ChannelUpdate, msgs::ChannelUpdate, [u8; 32], Transaction) {
create_announced_chan_between_nodes_with_value(nodes, a, b, 100000, 10001, a_flags, b_flags)
}
pub fn create_announced_chan_between_nodes_with_value(nodes: &Vec<Node>, a: usize, b: usize, channel_value: u64, push_msat: u64, a_flags: InitFeatures, b_flags: InitFeatures) -> (msgs::ChannelUpdate, msgs::ChannelUpdate, [u8; 32], Transaction) {
pub fn create_announced_chan_between_nodes_with_value<'a, 'b, 'c>(nodes: &'a Vec<Node<'b, 'c>>, a: usize, b: usize, channel_value: u64, push_msat: u64, a_flags: InitFeatures, b_flags: InitFeatures) -> (msgs::ChannelUpdate, msgs::ChannelUpdate, [u8; 32], Transaction) {
let chan_announcement = create_chan_between_nodes_with_value(&nodes[a], &nodes[b], channel_value, push_msat, a_flags, b_flags);
for node in nodes {
assert!(node.router.handle_channel_announcement(&chan_announcement.0).unwrap());
@ -366,7 +378,7 @@ macro_rules! check_closed_broadcast {
}}
}
pub fn close_channel(outbound_node: &Node, inbound_node: &Node, channel_id: &[u8; 32], funding_tx: Transaction, close_inbound_first: bool) -> (msgs::ChannelUpdate, msgs::ChannelUpdate, Transaction) {
pub fn close_channel<'a, 'b>(outbound_node: &Node<'a, 'b>, inbound_node: &Node<'a, 'b>, channel_id: &[u8; 32], funding_tx: Transaction, close_inbound_first: bool) -> (msgs::ChannelUpdate, msgs::ChannelUpdate, Transaction) {
let (node_a, broadcaster_a, struct_a) = if close_inbound_first { (&inbound_node.node, &inbound_node.tx_broadcaster, inbound_node) } else { (&outbound_node.node, &outbound_node.tx_broadcaster, outbound_node) };
let (node_b, broadcaster_b) = if close_inbound_first { (&outbound_node.node, &outbound_node.tx_broadcaster) } else { (&inbound_node.node, &inbound_node.tx_broadcaster) };
let (tx_a, tx_b);
@ -453,7 +465,7 @@ impl SendEvent {
}
}
pub fn from_node(node: &Node) -> SendEvent {
pub fn from_node<'a, 'b>(node: &Node<'a, 'b>) -> SendEvent {
let mut events = node.node.get_and_clear_pending_msg_events();
assert_eq!(events.len(), 1);
SendEvent::from_event(events.pop().unwrap())
@ -601,7 +613,7 @@ macro_rules! expect_payment_sent {
}
}
pub fn send_along_route_with_hash(origin_node: &Node, route: Route, expected_route: &[&Node], recv_value: u64, our_payment_hash: PaymentHash) {
pub fn send_along_route_with_hash<'a, 'b>(origin_node: &Node<'a, 'b>, route: Route, expected_route: &[&Node<'a, 'b>], recv_value: u64, our_payment_hash: PaymentHash) {
let mut payment_event = {
origin_node.node.send_payment(route, our_payment_hash).unwrap();
check_added_monitors!(origin_node, 1);
@ -643,13 +655,13 @@ pub fn send_along_route_with_hash(origin_node: &Node, route: Route, expected_rou
}
}
pub fn send_along_route(origin_node: &Node, route: Route, expected_route: &[&Node], recv_value: u64) -> (PaymentPreimage, PaymentHash) {
pub fn send_along_route<'a, 'b>(origin_node: &Node<'a, 'b>, route: Route, expected_route: &[&Node<'a, 'b>], recv_value: u64) -> (PaymentPreimage, PaymentHash) {
let (our_payment_preimage, our_payment_hash) = get_payment_preimage_hash!(origin_node);
send_along_route_with_hash(origin_node, route, expected_route, recv_value, our_payment_hash);
(our_payment_preimage, our_payment_hash)
}
pub fn claim_payment_along_route(origin_node: &Node, expected_route: &[&Node], skip_last: bool, our_payment_preimage: PaymentPreimage, expected_amount: u64) {
pub fn claim_payment_along_route<'a, 'b>(origin_node: &Node<'a, 'b>, expected_route: &[&Node<'a, 'b>], skip_last: bool, our_payment_preimage: PaymentPreimage, expected_amount: u64) {
assert!(expected_route.last().unwrap().node.claim_funds(our_payment_preimage, expected_amount));
check_added_monitors!(expected_route.last().unwrap(), 1);
@ -727,13 +739,13 @@ pub fn claim_payment_along_route(origin_node: &Node, expected_route: &[&Node], s
}
}
pub fn claim_payment(origin_node: &Node, expected_route: &[&Node], our_payment_preimage: PaymentPreimage, expected_amount: u64) {
pub fn claim_payment<'a, 'b>(origin_node: &Node<'a, 'b>, expected_route: &[&Node<'a, 'b>], our_payment_preimage: PaymentPreimage, expected_amount: u64) {
claim_payment_along_route(origin_node, expected_route, false, our_payment_preimage, expected_amount);
}
pub const TEST_FINAL_CLTV: u32 = 32;
pub fn route_payment(origin_node: &Node, expected_route: &[&Node], recv_value: u64) -> (PaymentPreimage, PaymentHash) {
pub fn route_payment<'a, 'b>(origin_node: &Node<'a, 'b>, expected_route: &[&Node<'a, 'b>], recv_value: u64) -> (PaymentPreimage, PaymentHash) {
let route = origin_node.router.get_route(&expected_route.last().unwrap().node.get_our_node_id(), None, &Vec::new(), recv_value, TEST_FINAL_CLTV).unwrap();
assert_eq!(route.hops.len(), expected_route.len());
for (node, hop) in expected_route.iter().zip(route.hops.iter()) {
@ -743,7 +755,7 @@ pub fn route_payment(origin_node: &Node, expected_route: &[&Node], recv_value: u
send_along_route(origin_node, route, expected_route, recv_value)
}
pub fn route_over_limit(origin_node: &Node, expected_route: &[&Node], recv_value: u64) {
pub fn route_over_limit<'a, 'b>(origin_node: &Node<'a, 'b>, expected_route: &[&Node<'a, 'b>], recv_value: u64) {
let route = origin_node.router.get_route(&expected_route.last().unwrap().node.get_our_node_id(), None, &Vec::new(), recv_value, TEST_FINAL_CLTV).unwrap();
assert_eq!(route.hops.len(), expected_route.len());
for (node, hop) in expected_route.iter().zip(route.hops.iter()) {
@ -759,12 +771,12 @@ pub fn route_over_limit(origin_node: &Node, expected_route: &[&Node], recv_value
};
}
pub fn send_payment(origin: &Node, expected_route: &[&Node], recv_value: u64, expected_value: u64) {
pub fn send_payment<'a, 'b>(origin: &Node<'a, 'b>, expected_route: &[&Node<'a, 'b>], recv_value: u64, expected_value: u64) {
let our_payment_preimage = route_payment(&origin, expected_route, recv_value).0;
claim_payment(&origin, expected_route, our_payment_preimage, expected_value);
}
pub fn fail_payment_along_route(origin_node: &Node, expected_route: &[&Node], skip_last: bool, our_payment_hash: PaymentHash) {
pub fn fail_payment_along_route<'a, 'b>(origin_node: &Node<'a, 'b>, expected_route: &[&Node<'a, 'b>], skip_last: bool, our_payment_hash: PaymentHash) {
assert!(expected_route.last().unwrap().node.fail_htlc_backwards(&our_payment_hash));
expect_pending_htlcs_forwardable!(expected_route.last().unwrap());
check_added_monitors!(expected_route.last().unwrap(), 1);
@ -833,44 +845,59 @@ pub fn fail_payment_along_route(origin_node: &Node, expected_route: &[&Node], sk
}
}
pub fn fail_payment(origin_node: &Node, expected_route: &[&Node], our_payment_hash: PaymentHash) {
pub fn fail_payment<'a, 'b>(origin_node: &Node<'a, 'b>, expected_route: &[&Node<'a, 'b>], our_payment_hash: PaymentHash) {
fail_payment_along_route(origin_node, expected_route, false, our_payment_hash);
}
pub fn create_network(node_count: usize, node_config: &[Option<UserConfig>]) -> Vec<Node> {
pub fn create_node_cfgs(node_count: usize) -> Vec<NodeCfg> {
let mut nodes = Vec::new();
let mut rng = thread_rng();
let secp_ctx = Secp256k1::new();
for i in 0..node_count {
let logger = Arc::new(test_utils::TestLogger::with_id(format!("node {}", i)));
let fee_estimator = Arc::new(test_utils::TestFeeEstimator { sat_per_kw: 253 });
let chain_monitor = Arc::new(chaininterface::ChainWatchInterfaceUtil::new(Network::Testnet, logger.clone() as Arc<Logger>));
let tx_broadcaster = Arc::new(test_utils::TestBroadcaster{txn_broadcasted: Mutex::new(Vec::new())});
let mut seed = [0; 32];
rng.fill_bytes(&mut seed);
let keys_manager = Arc::new(test_utils::TestKeysInterface::new(&seed, Network::Testnet, logger.clone() as Arc<Logger>));
let chan_monitor = test_utils::TestChannelMonitor::new(chain_monitor.clone(), tx_broadcaster.clone(), logger.clone(), fee_estimator.clone());
nodes.push(NodeCfg { chain_monitor, logger, tx_broadcaster, fee_estimator, chan_monitor, keys_manager, node_seed: seed });
}
nodes
}
pub fn create_node_chanmgrs<'a, 'b>(node_count: usize, cfgs: &'a Vec<NodeCfg>, node_config: &[Option<UserConfig>]) -> Vec<ChannelManager<EnforcingChannelKeys, &'a TestChannelMonitor>> {
let mut chanmgrs = Vec::new();
for i in 0..node_count {
let mut default_config = UserConfig::default();
default_config.channel_options.announced_channel = true;
default_config.peer_channel_config_limits.force_announced_channel_preference = false;
let node = ChannelManager::new(Network::Testnet, cfgs[i].fee_estimator.clone(), &cfgs[i].chan_monitor, cfgs[i].tx_broadcaster.clone(), cfgs[i].logger.clone(), cfgs[i].keys_manager.clone(), if node_config[i].is_some() { node_config[i].clone().unwrap() } else { default_config }, 0).unwrap();
chanmgrs.push(node);
}
chanmgrs
}
pub fn create_network<'a, 'b>(node_count: usize, cfgs: &'a Vec<NodeCfg>, chan_mgrs: &'b Vec<ChannelManager<EnforcingChannelKeys, &'a TestChannelMonitor>>) -> Vec<Node<'a, 'b>> {
let secp_ctx = Secp256k1::new();
let mut nodes = Vec::new();
let chan_count = Rc::new(RefCell::new(0));
let payment_count = Rc::new(RefCell::new(0));
for i in 0..node_count {
let test_logger = Arc::new(test_utils::TestLogger::with_id(format!("node {}", i)));
let logger = &(Arc::clone(&test_logger) as Arc<Logger>);
let feeest = Arc::new(test_utils::TestFeeEstimator { sat_per_kw: 253 });
let chain_monitor = Arc::new(chaininterface::ChainWatchInterfaceUtil::new(Network::Testnet, Arc::clone(&logger)));
let block_notifier = Arc::new(chaininterface::BlockNotifier::new(chain_monitor.clone()));
let tx_broadcaster = Arc::new(test_utils::TestBroadcaster{txn_broadcasted: Mutex::new(Vec::new())});
let mut seed = [0; 32];
rng.fill_bytes(&mut seed);
let keys_manager = Arc::new(test_utils::TestKeysInterface::new(&seed, Network::Testnet, Arc::clone(&logger)));
let chan_monitor = Arc::new(test_utils::TestChannelMonitor::new(chain_monitor.clone(), tx_broadcaster.clone(), logger.clone(), feeest.clone()));
let weak_res = Arc::downgrade(&chan_monitor.simple_monitor);
block_notifier.register_listener(weak_res);
let mut default_config = UserConfig::default();
default_config.channel_options.announced_channel = true;
default_config.peer_channel_config_limits.force_announced_channel_preference = false;
let node = ChannelManager::new(Network::Testnet, feeest.clone(), chan_monitor.clone(), tx_broadcaster.clone(), Arc::clone(&logger), keys_manager.clone(), if node_config[i].is_some() { node_config[i].clone().unwrap() } else { default_config }, 0).unwrap();
let weak_res = Arc::downgrade(&node);
block_notifier.register_listener(weak_res);
let router = Router::new(PublicKey::from_secret_key(&secp_ctx, &keys_manager.get_node_secret()), chain_monitor.clone(), Arc::clone(&logger));
nodes.push(Node { chain_monitor, tx_broadcaster, chan_monitor, node, router, keys_manager, node_seed: seed,
network_payment_count: payment_count.clone(),
network_chan_count: chan_count.clone(),
block_notifier,
logger: test_logger
});
let block_notifier = chaininterface::BlockNotifier::new(cfgs[i].chain_monitor.clone());
block_notifier.register_listener(&cfgs[i].chan_monitor.simple_monitor as &chaininterface::ChainListener);
block_notifier.register_listener(&chan_mgrs[i] as &chaininterface::ChainListener);
let router = Router::new(PublicKey::from_secret_key(&secp_ctx, &cfgs[i].keys_manager.get_node_secret()), cfgs[i].chain_monitor.clone(), cfgs[i].logger.clone() as Arc<Logger>);
nodes.push(Node{ chain_monitor: cfgs[i].chain_monitor.clone(), block_notifier,
tx_broadcaster: cfgs[i].tx_broadcaster.clone(), chan_monitor: &cfgs[i].chan_monitor,
keys_manager: cfgs[i].keys_manager.clone(), node: &chan_mgrs[i], router,
node_seed: cfgs[i].node_seed, network_chan_count: chan_count.clone(),
network_payment_count: payment_count.clone(), logger: cfgs[i].logger.clone(),
})
}
nodes
@ -892,7 +919,7 @@ pub enum HTLCType { NONE, TIMEOUT, SUCCESS }
///
/// All broadcast transactions must be accounted for in one of the above three types of we'll
/// also fail.
pub fn test_txn_broadcast(node: &Node, chan: &(msgs::ChannelUpdate, msgs::ChannelUpdate, [u8; 32], Transaction), commitment_tx: Option<Transaction>, has_htlc_tx: HTLCType) -> Vec<Transaction> {
pub fn test_txn_broadcast<'a, 'b>(node: &Node<'a, 'b>, chan: &(msgs::ChannelUpdate, msgs::ChannelUpdate, [u8; 32], Transaction), commitment_tx: Option<Transaction>, has_htlc_tx: HTLCType) -> Vec<Transaction> {
let mut node_txn = node.tx_broadcaster.txn_broadcasted.lock().unwrap();
assert!(node_txn.len() >= if commitment_tx.is_some() { 0 } else { 1 } + if has_htlc_tx == HTLCType::NONE { 0 } else { 1 });
@ -937,7 +964,7 @@ pub fn test_txn_broadcast(node: &Node, chan: &(msgs::ChannelUpdate, msgs::Channe
/// Tests that the given node has broadcast a claim transaction against the provided revoked
/// HTLC transaction.
pub fn test_revoked_htlc_claim_txn_broadcast(node: &Node, revoked_tx: Transaction, commitment_revoked_tx: Transaction) {
pub fn test_revoked_htlc_claim_txn_broadcast<'a, 'b>(node: &Node<'a, 'b>, revoked_tx: Transaction, commitment_revoked_tx: Transaction) {
let mut node_txn = node.tx_broadcaster.txn_broadcasted.lock().unwrap();
// We should issue a 2nd transaction if one htlc is dropped from initial claiming tx
// but sometimes not as feerate is too-low
@ -955,7 +982,7 @@ pub fn test_revoked_htlc_claim_txn_broadcast(node: &Node, revoked_tx: Transactio
assert!(node_txn.is_empty());
}
pub fn check_preimage_claim(node: &Node, prev_txn: &Vec<Transaction>) -> Vec<Transaction> {
pub fn check_preimage_claim<'a, 'b>(node: &Node<'a, 'b>, prev_txn: &Vec<Transaction>) -> Vec<Transaction> {
let mut node_txn = node.tx_broadcaster.txn_broadcasted.lock().unwrap();
assert!(node_txn.len() >= 1);
@ -979,7 +1006,7 @@ pub fn check_preimage_claim(node: &Node, prev_txn: &Vec<Transaction>) -> Vec<Tra
res
}
pub fn get_announce_close_broadcast_events(nodes: &Vec<Node>, a: usize, b: usize) {
pub fn get_announce_close_broadcast_events<'a, 'b>(nodes: &Vec<Node<'a, 'b>>, a: usize, b: usize) {
let events_1 = nodes[a].node.get_and_clear_pending_msg_events();
assert_eq!(events_1.len(), 1);
let as_update = match events_1[0] {
@ -1086,7 +1113,7 @@ macro_rules! handle_chan_reestablish_msgs {
/// pending_htlc_adds includes both the holding cell and in-flight update_add_htlcs, whereas
/// for claims/fails they are separated out.
pub fn reconnect_nodes(node_a: &Node, node_b: &Node, send_funding_locked: (bool, bool), pending_htlc_adds: (i64, i64), pending_htlc_claims: (usize, usize), pending_cell_htlc_claims: (usize, usize), pending_cell_htlc_fails: (usize, usize), pending_raa: (bool, bool)) {
pub fn reconnect_nodes<'a, 'b>(node_a: &Node<'a, 'b>, node_b: &Node<'a, 'b>, send_funding_locked: (bool, bool), pending_htlc_adds: (i64, i64), pending_htlc_claims: (usize, usize), pending_cell_htlc_claims: (usize, usize), pending_cell_htlc_fails: (usize, usize), pending_raa: (bool, bool)) {
node_a.node.peer_connected(&node_b.node.get_our_node_id(), &msgs::Init { features: InitFeatures::empty() });
let reestablish_1 = get_chan_reestablish_msgs!(node_a, node_b);
node_b.node.peer_connected(&node_a.node.get_our_node_id(), &msgs::Init { features: InitFeatures::empty() });

File diff suppressed because it is too large Load Diff

View File

@ -10,26 +10,29 @@ use secp256k1::key::{SecretKey,PublicKey};
use ln::features::InitFeatures;
use ln::msgs;
use ln::msgs::ChannelMessageHandler;
use ln::channelmanager::{SimpleArcChannelManager, SimpleRefChannelManager};
use util::ser::{Writeable, Writer, Readable};
use ln::peer_channel_encryptor::{PeerChannelEncryptor,NextNoiseStep};
use util::byte_utils;
use util::events::{MessageSendEvent};
use util::events::{MessageSendEvent, MessageSendEventsProvider};
use util::logger::Logger;
use std::collections::{HashMap,hash_map,HashSet,LinkedList};
use std::sync::{Arc, Mutex};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::{cmp,error,hash,fmt};
use std::ops::Deref;
use bitcoin_hashes::sha256::Hash as Sha256;
use bitcoin_hashes::sha256::HashEngine as Sha256Engine;
use bitcoin_hashes::{HashEngine, Hash};
/// Provides references to trait impls which handle different types of messages.
pub struct MessageHandler {
pub struct MessageHandler<CM: Deref> where CM::Target: msgs::ChannelMessageHandler {
/// A message handler which handles messages specific to channels. Usually this is just a
/// ChannelManager object.
pub chan_handler: Arc<msgs::ChannelMessageHandler>,
pub chan_handler: CM,
/// A message handler which handles messages updating our knowledge of the network channel
/// graph. Usually this is just a Router object.
pub route_handler: Arc<msgs::RoutingMessageHandler>,
@ -150,10 +153,31 @@ fn _check_usize_is_32_or_64() {
unsafe { mem::transmute::<*const usize, [u8; 4]>(panic!()); }
}
/// SimpleArcPeerManager is useful when you need a PeerManager with a static lifetime, e.g.
/// when you're using lightning-net-tokio (since tokio::spawn requires parameters with static
/// lifetimes). Other times you can afford a reference, which is more efficient, in which case
/// SimpleRefPeerManager is the more appropriate type. Defining these type aliases prevents
/// issues such as overly long function definitions.
pub type SimpleArcPeerManager<SD, M> = Arc<PeerManager<SD, SimpleArcChannelManager<M>>>;
/// SimpleRefPeerManager is a type alias for a PeerManager reference, and is the reference
/// counterpart to the SimpleArcPeerManager type alias. Use this type by default when you don't
/// need a PeerManager with a static lifetime. You'll need a static lifetime in cases such as
/// usage of lightning-net-tokio (since tokio::spawn requires parameters with static lifetimes).
/// But if this is not necessary, using a reference is more efficient. Defining these type aliases
/// helps with issues such as long function definitions.
pub type SimpleRefPeerManager<'a, SD, M> = PeerManager<SD, SimpleRefChannelManager<'a, M>>;
/// A PeerManager manages a set of peers, described by their SocketDescriptor and marshalls socket
/// events into messages which it passes on to its MessageHandlers.
pub struct PeerManager<Descriptor: SocketDescriptor> {
message_handler: MessageHandler,
///
/// Rather than using a plain PeerManager, it is preferable to use either a SimpleArcPeerManager
/// a SimpleRefPeerManager, for conciseness. See their documentation for more details, but
/// essentially you should default to using a SimpleRefPeerManager, and use a
/// SimpleArcPeerManager when you require a PeerManager with a static lifetime, such as when
/// you're using lightning-net-tokio.
pub struct PeerManager<Descriptor: SocketDescriptor, CM: Deref> where CM::Target: msgs::ChannelMessageHandler {
message_handler: MessageHandler<CM>,
peers: Mutex<PeerHolder<Descriptor>>,
our_node_secret: SecretKey,
ephemeral_key_midstate: Sha256Engine,
@ -192,11 +216,11 @@ const INITIAL_SYNCS_TO_SEND: usize = 5;
/// Manages and reacts to connection events. You probably want to use file descriptors as PeerIds.
/// PeerIds may repeat, but only after disconnect_event() has been called.
impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
impl<Descriptor: SocketDescriptor, CM: Deref> PeerManager<Descriptor, CM> where CM::Target: msgs::ChannelMessageHandler {
/// Constructs a new PeerManager with the given message handlers and node_id secret key
/// ephemeral_random_data is used to derive per-connection ephemeral keys and must be
/// cryptographically secure random bytes.
pub fn new(message_handler: MessageHandler, our_node_secret: SecretKey, ephemeral_random_data: &[u8; 32], logger: Arc<Logger>) -> PeerManager<Descriptor> {
pub fn new(message_handler: MessageHandler<CM>, our_node_secret: SecretKey, ephemeral_random_data: &[u8; 32], logger: Arc<Logger>) -> PeerManager<Descriptor, CM> {
let mut ephemeral_key_midstate = Sha256::engine();
ephemeral_key_midstate.input(ephemeral_random_data);
@ -1147,22 +1171,31 @@ mod tests {
fn disconnect_socket(&mut self) {}
}
fn create_network(peer_count: usize) -> Vec<PeerManager<FileDescriptor>> {
fn create_chan_handlers(peer_count: usize) -> Vec<test_utils::TestChannelMessageHandler> {
let mut chan_handlers = Vec::new();
for _ in 0..peer_count {
let chan_handler = test_utils::TestChannelMessageHandler::new();
chan_handlers.push(chan_handler);
}
chan_handlers
}
fn create_network<'a>(peer_count: usize, chan_handlers: &'a Vec<test_utils::TestChannelMessageHandler>) -> Vec<PeerManager<FileDescriptor, &'a test_utils::TestChannelMessageHandler>> {
let mut peers = Vec::new();
let mut rng = thread_rng();
let logger : Arc<Logger> = Arc::new(test_utils::TestLogger::new());
let mut ephemeral_bytes = [0; 32];
rng.fill_bytes(&mut ephemeral_bytes);
for _ in 0..peer_count {
let chan_handler = test_utils::TestChannelMessageHandler::new();
for i in 0..peer_count {
let router = test_utils::TestRoutingMessageHandler::new();
let node_id = {
let mut key_slice = [0;32];
rng.fill_bytes(&mut key_slice);
SecretKey::from_slice(&key_slice).unwrap()
};
let msg_handler = MessageHandler { chan_handler: Arc::new(chan_handler), route_handler: Arc::new(router) };
let msg_handler = MessageHandler { chan_handler: &chan_handlers[i], route_handler: Arc::new(router) };
let peer = PeerManager::new(msg_handler, node_id, &ephemeral_bytes, Arc::clone(&logger));
peers.push(peer);
}
@ -1170,7 +1203,7 @@ mod tests {
peers
}
fn establish_connection(peer_a: &PeerManager<FileDescriptor>, peer_b: &PeerManager<FileDescriptor>) {
fn establish_connection<'a>(peer_a: &PeerManager<FileDescriptor, &'a test_utils::TestChannelMessageHandler>, peer_b: &PeerManager<FileDescriptor, &'a test_utils::TestChannelMessageHandler>) {
let secp_ctx = Secp256k1::new();
let their_id = PublicKey::from_secret_key(&secp_ctx, &peer_b.our_node_secret);
let fd = FileDescriptor { fd: 1};
@ -1182,20 +1215,21 @@ mod tests {
fn test_disconnect_peer() {
// Simple test which builds a network of PeerManager, connects and brings them to NoiseState::Finished and
// push a DisconnectPeer event to remove the node flagged by id
let mut peers = create_network(2);
let chan_handlers = create_chan_handlers(2);
let chan_handler = test_utils::TestChannelMessageHandler::new();
let mut peers = create_network(2, &chan_handlers);
establish_connection(&peers[0], &peers[1]);
assert_eq!(peers[0].peers.lock().unwrap().peers.len(), 1);
let secp_ctx = Secp256k1::new();
let their_id = PublicKey::from_secret_key(&secp_ctx, &peers[1].our_node_secret);
let chan_handler = test_utils::TestChannelMessageHandler::new();
chan_handler.pending_events.lock().unwrap().push(events::MessageSendEvent::HandleError {
node_id: their_id,
action: msgs::ErrorAction::DisconnectPeer { msg: None },
});
assert_eq!(chan_handler.pending_events.lock().unwrap().len(), 1);
peers[0].message_handler.chan_handler = Arc::new(chan_handler);
peers[0].message_handler.chan_handler = &chan_handler;
peers[0].process_events();
assert_eq!(peers[0].peers.lock().unwrap().peers.len(), 0);
@ -1203,7 +1237,8 @@ mod tests {
#[test]
fn test_timer_tick_occured(){
// Create peers, a vector of two peer managers, perform initial set up and check that peers[0] has one Peer.
let peers = create_network(2);
let chan_handlers = create_chan_handlers(2);
let peers = create_network(2, &chan_handlers);
establish_connection(&peers[0], &peers[1]);
assert_eq!(peers[0].peers.lock().unwrap().peers.len(), 1);

View File

@ -46,7 +46,7 @@ impl chaininterface::FeeEstimator for TestFeeEstimator {
pub struct TestChannelMonitor {
pub added_monitors: Mutex<Vec<(OutPoint, channelmonitor::ChannelMonitor)>>,
pub simple_monitor: Arc<channelmonitor::SimpleManyChannelMonitor<OutPoint>>,
pub simple_monitor: channelmonitor::SimpleManyChannelMonitor<OutPoint>,
pub update_ret: Mutex<Result<(), channelmonitor::ChannelMonitorUpdateErr>>,
}
impl TestChannelMonitor {