Avoid re-allocating to encrypt gossip messages when forwarding

When we forward gossip messages, we store them in a separate buffer
before we encrypt them (and commit to the order in which they'll
appear on the wire). Rather than storing that buffer encoded with
no headroom, requiring re-allocating to add the message length and
two MAC blocks, we here add the headroom prior to pushing it into
the gossip buffer, avoiding an allocation.
This commit is contained in:
Matt Corallo 2023-11-04 20:37:21 +00:00
parent 0503df88c7
commit 969085bf1e
3 changed files with 64 additions and 56 deletions

View file

@ -7,7 +7,7 @@
// You may not use this file except in accordance with one or both of these
// licenses.
use lightning::ln::peer_channel_encryptor::PeerChannelEncryptor;
use lightning::ln::peer_channel_encryptor::{PeerChannelEncryptor, MessageBuf};
use lightning::util::test_utils::TestNodeSigner;
use bitcoin::secp256k1::{Secp256k1, PublicKey, SecretKey};
@ -77,7 +77,7 @@ pub fn do_test(data: &[u8]) {
let mut buf = [0; 65536 + 16];
loop {
if get_slice!(1)[0] == 0 {
crypter.encrypt_buffer(get_slice!(slice_to_be16(get_slice!(2))));
crypter.encrypt_buffer(MessageBuf::from_encoded(&get_slice!(slice_to_be16(get_slice!(2)))));
} else {
let len = match crypter.decrypt_length_header(get_slice!(16+2)) {
Ok(len) => len,

View file

@ -427,48 +427,16 @@ impl PeerChannelEncryptor {
Ok(self.their_node_id.unwrap().clone())
}
/// Encrypts the given pre-serialized message, returning the encrypted version.
/// panics if msg.len() > 65535 or Noise handshake has not finished.
pub fn encrypt_buffer(&mut self, msg: &[u8]) -> Vec<u8> {
if msg.len() > LN_MAX_MSG_LEN {
panic!("Attempted to encrypt message longer than 65535 bytes!");
}
let mut res = Vec::with_capacity(msg.len() + 16*2 + 2);
res.resize(msg.len() + 16*2 + 2, 0);
match self.noise_state {
NoiseState::Finished { ref mut sk, ref mut sn, ref mut sck, rk: _, rn: _, rck: _ } => {
if *sn >= 1000 {
let (new_sck, new_sk) = hkdf_extract_expand_twice(sck, sk);
*sck = new_sck;
*sk = new_sk;
*sn = 0;
}
Self::encrypt_with_ad(&mut res[0..16+2], *sn, sk, &[0; 0], &(msg.len() as u16).to_be_bytes());
*sn += 1;
Self::encrypt_with_ad(&mut res[16+2..], *sn, sk, &[0; 0], msg);
*sn += 1;
},
_ => panic!("Tried to encrypt a message prior to noise handshake completion"),
}
res
}
/// Encrypts the given message, returning the encrypted version.
/// panics if the length of `message`, once encoded, is greater than 65535 or if the Noise
/// handshake has not finished.
pub fn encrypt_message<M: wire::Type>(&mut self, message: &M) -> Vec<u8> {
// Allocate a buffer with 2KB, fitting most common messages. Reserve the first 16+2 bytes
// for the 2-byte message type prefix and its MAC.
let mut res = VecWriter(Vec::with_capacity(MSG_BUF_ALLOC_SIZE));
res.0.resize(16 + 2, 0);
wire::write(message, &mut res).expect("In-memory messages must never fail to serialize");
let msg_len = res.0.len() - 16 - 2;
/// Builds sendable bytes for a message.
///
/// `msgbuf` must begin with 16 + 2 dummy/0 bytes, which will be filled with the encrypted
/// message length and its MAC. It should then be followed by the message bytes themselves
/// (including the two byte message type).
///
/// For effeciency, the [`Vec::capacity`] should be at least 16 bytes larger than the
/// [`Vec::len`], to avoid reallocating for the message MAC, which will be appended to the vec.
fn encrypt_message_with_header_0s(&mut self, msgbuf: &mut Vec<u8>) {
let msg_len = msgbuf.len() - 16 - 2;
if msg_len > LN_MAX_MSG_LEN {
panic!("Attempted to encrypt message longer than 65535 bytes!");
}
@ -482,15 +450,34 @@ impl PeerChannelEncryptor {
*sn = 0;
}
Self::encrypt_with_ad(&mut res.0[0..16+2], *sn, sk, &[0; 0], &(msg_len as u16).to_be_bytes());
Self::encrypt_with_ad(&mut msgbuf[0..16+2], *sn, sk, &[0; 0], &(msg_len as u16).to_be_bytes());
*sn += 1;
Self::encrypt_in_place_with_ad(&mut res.0, 16+2, *sn, sk, &[0; 0]);
Self::encrypt_in_place_with_ad(msgbuf, 16+2, *sn, sk, &[0; 0]);
*sn += 1;
},
_ => panic!("Tried to encrypt a message prior to noise handshake completion"),
}
}
/// Encrypts the given pre-serialized message, returning the encrypted version.
/// panics if msg.len() > 65535 or Noise handshake has not finished.
pub fn encrypt_buffer(&mut self, mut msg: MessageBuf) -> Vec<u8> {
self.encrypt_message_with_header_0s(&mut msg.0);
msg.0
}
/// Encrypts the given message, returning the encrypted version.
/// panics if the length of `message`, once encoded, is greater than 65535 or if the Noise
/// handshake has not finished.
pub fn encrypt_message<M: wire::Type>(&mut self, message: &M) -> Vec<u8> {
// Allocate a buffer with 2KB, fitting most common messages. Reserve the first 16+2 bytes
// for the 2-byte message type prefix and its MAC.
let mut res = VecWriter(Vec::with_capacity(MSG_BUF_ALLOC_SIZE));
res.0.resize(16 + 2, 0);
wire::write(message, &mut res).expect("In-memory messages must never fail to serialize");
self.encrypt_message_with_header_0s(&mut res.0);
res.0
}
@ -557,9 +544,30 @@ impl PeerChannelEncryptor {
}
}
/// A buffer which stores an encoded message (including the two message-type bytes) with some
/// padding to allow for future encryption/MACing.
pub struct MessageBuf(Vec<u8>);
impl MessageBuf {
/// Creates a new buffer from an encoded message (i.e. the two message-type bytes followed by
/// the message contents).
///
/// Panics if the message is longer than 2^16.
pub fn from_encoded(encoded_msg: &[u8]) -> Self {
if encoded_msg.len() > LN_MAX_MSG_LEN {
panic!("Attempted to encrypt message longer than 65535 bytes!");
}
// In addition to the message (continaing the two message type bytes), we also have to add
// the message length header (and its MAC) and the message MAC.
let mut res = Vec::with_capacity(encoded_msg.len() + 16*2 + 2);
res.resize(encoded_msg.len() + 16 + 2, 0);
res[16 + 2..].copy_from_slice(&encoded_msg);
Self(res)
}
}
#[cfg(test)]
mod tests {
use super::LN_MAX_MSG_LEN;
use super::{MessageBuf, LN_MAX_MSG_LEN};
use bitcoin::secp256k1::{PublicKey, SecretKey};
use bitcoin::secp256k1::Secp256k1;
@ -775,7 +783,7 @@ mod tests {
for i in 0..1005 {
let msg = [0x68, 0x65, 0x6c, 0x6c, 0x6f];
let mut res = outbound_peer.encrypt_buffer(&msg);
let mut res = outbound_peer.encrypt_buffer(MessageBuf::from_encoded(&msg));
assert_eq!(res.len(), 5 + 2*16 + 2);
let len_header = res[0..2+16].to_vec();
@ -811,7 +819,7 @@ mod tests {
fn max_message_len_encryption() {
let mut outbound_peer = get_outbound_peer_for_initiator_test_vectors();
let msg = [4u8; LN_MAX_MSG_LEN + 1];
outbound_peer.encrypt_buffer(&msg);
outbound_peer.encrypt_buffer(MessageBuf::from_encoded(&msg));
}
#[test]

View file

@ -27,7 +27,7 @@ use crate::ln::msgs::{ChannelMessageHandler, LightningError, SocketAddress, Onio
#[cfg(not(c_bindings))]
use crate::ln::channelmanager::{SimpleArcChannelManager, SimpleRefChannelManager};
use crate::util::ser::{VecWriter, Writeable, Writer};
use crate::ln::peer_channel_encryptor::{PeerChannelEncryptor, NextNoiseStep, MSG_BUF_ALLOC_SIZE};
use crate::ln::peer_channel_encryptor::{PeerChannelEncryptor, NextNoiseStep, MessageBuf, MSG_BUF_ALLOC_SIZE};
use crate::ln::wire;
use crate::ln::wire::{Encode, Type};
#[cfg(not(c_bindings))]
@ -495,7 +495,7 @@ struct Peer {
/// prioritize channel messages over them.
///
/// Note that these messages are *not* encrypted/MAC'd, and are only serialized.
gossip_broadcast_buffer: VecDeque<Vec<u8>>,
gossip_broadcast_buffer: VecDeque<MessageBuf>,
awaiting_write_event: bool,
pending_read_buffer: Vec<u8>,
@ -1102,7 +1102,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
}
if peer.should_buffer_gossip_broadcast() {
if let Some(msg) = peer.gossip_broadcast_buffer.pop_front() {
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_buffer(&msg[..]));
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_buffer(msg));
}
}
if peer.should_buffer_gossip_backfill() {
@ -1251,7 +1251,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
}
/// Append a message to a peer's pending outbound/write gossip broadcast buffer
fn enqueue_encoded_gossip_broadcast(&self, peer: &mut Peer, encoded_message: Vec<u8>) {
fn enqueue_encoded_gossip_broadcast(&self, peer: &mut Peer, encoded_message: MessageBuf) {
peer.msgs_sent_since_pong += 1;
debug_assert!(peer.gossip_broadcast_buffer.len() <= OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP);
peer.gossip_broadcast_buffer.push_back(encoded_message);
@ -1800,7 +1800,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
if except_node.is_some() && peer.their_node_id.as_ref().map(|(pk, _)| pk) == except_node {
continue;
}
self.enqueue_encoded_gossip_broadcast(&mut *peer, encoded_msg.clone());
self.enqueue_encoded_gossip_broadcast(&mut *peer, MessageBuf::from_encoded(&encoded_msg));
}
},
wire::Message::NodeAnnouncement(ref msg) => {
@ -1827,7 +1827,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
if except_node.is_some() && peer.their_node_id.as_ref().map(|(pk, _)| pk) == except_node {
continue;
}
self.enqueue_encoded_gossip_broadcast(&mut *peer, encoded_msg.clone());
self.enqueue_encoded_gossip_broadcast(&mut *peer, MessageBuf::from_encoded(&encoded_msg));
}
},
wire::Message::ChannelUpdate(ref msg) => {
@ -1849,7 +1849,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
if except_node.is_some() && peer.their_node_id.as_ref().map(|(pk, _)| pk) == except_node {
continue;
}
self.enqueue_encoded_gossip_broadcast(&mut *peer, encoded_msg.clone());
self.enqueue_encoded_gossip_broadcast(&mut *peer, MessageBuf::from_encoded(&encoded_msg));
}
},
_ => debug_assert!(false, "We shouldn't attempt to forward anything but gossip messages"),