Introduce graph sync crate for fast-forwarding through gossip data downloaded from a server.

This commit is contained in:
Arik Sosman 2021-11-03 10:50:08 -07:00
parent 75ca50f5c0
commit a58ae4c97b
No known key found for this signature in database
GPG Key ID: F4FB5A3366C4D92E
19 changed files with 1185 additions and 64 deletions

View File

@ -3,3 +3,4 @@
[*] [*]
indent_style = tab indent_style = tab
insert_final_newline = true insert_final_newline = true
trim_trailing_whitespace = true

View File

@ -141,6 +141,7 @@ jobs:
run: | run: |
cargo test --verbose --color always -p lightning cargo test --verbose --color always -p lightning
cargo test --verbose --color always -p lightning-invoice cargo test --verbose --color always -p lightning-invoice
cargo test --verbose --color always -p lightning-rapid-gossip-sync
cargo build --verbose --color always -p lightning-persister cargo build --verbose --color always -p lightning-persister
cargo build --verbose --color always -p lightning-background-processor cargo build --verbose --color always -p lightning-background-processor
- name: Test C Bindings Modifications on Rust ${{ matrix.toolchain }} - name: Test C Bindings Modifications on Rust ${{ matrix.toolchain }}
@ -221,11 +222,24 @@ jobs:
- name: Fetch routing graph snapshot - name: Fetch routing graph snapshot
if: steps.cache-graph.outputs.cache-hit != 'true' if: steps.cache-graph.outputs.cache-hit != 'true'
run: | run: |
wget -O lightning/net_graph-2021-05-31.bin https://bitcoin.ninja/ldk-net_graph-v0.0.15-2021-05-31.bin curl --verbose -L -o lightning/net_graph-2021-05-31.bin https://bitcoin.ninja/ldk-net_graph-v0.0.15-2021-05-31.bin
if [ "$(sha256sum lightning/net_graph-2021-05-31.bin | awk '{ print $1 }')" != "05a5361278f68ee2afd086cc04a1f927a63924be451f3221d380533acfacc303" ]; then echo "Sha sum: $(sha256sum lightning/net_graph-2021-05-31.bin | awk '{ print $1 }')"
if [ "$(sha256sum lightning/net_graph-2021-05-31.bin | awk '{ print $1 }')" != "${EXPECTED_ROUTING_GRAPH_SNAPSHOT_SHASUM}" ]; then
echo "Bad hash" echo "Bad hash"
exit 1 exit 1
fi fi
env:
EXPECTED_ROUTING_GRAPH_SNAPSHOT_SHASUM: 05a5361278f68ee2afd086cc04a1f927a63924be451f3221d380533acfacc303
- name: Fetch rapid graph sync reference input
run: |
curl --verbose -L -o lightning-rapid-gossip-sync/res/full_graph.lngossip https://bitcoin.ninja/ldk-compressed_graph-bc08df7542-2022-05-05.bin
echo "Sha sum: $(sha256sum lightning-rapid-gossip-sync/res/full_graph.lngossip | awk '{ print $1 }')"
if [ "$(sha256sum lightning-rapid-gossip-sync/res/full_graph.lngossip | awk '{ print $1 }')" != "${EXPECTED_RAPID_GOSSIP_SHASUM}" ]; then
echo "Bad hash"
exit 1
fi
env:
EXPECTED_RAPID_GOSSIP_SHASUM: 9637b91cea9d64320cf48fc0787c70fe69fc062f90d3512e207044110cadfd7b
- name: Test with Network Graph on Rust ${{ matrix.toolchain }} - name: Test with Network Graph on Rust ${{ matrix.toolchain }}
run: | run: |
cd lightning cd lightning

View File

@ -7,6 +7,7 @@ members = [
"lightning-net-tokio", "lightning-net-tokio",
"lightning-persister", "lightning-persister",
"lightning-background-processor", "lightning-background-processor",
"lightning-rapid-gossip-sync"
] ]
exclude = [ exclude = [

View File

@ -19,6 +19,7 @@ stdin_fuzz = []
[dependencies] [dependencies]
afl = { version = "0.4", optional = true } afl = { version = "0.4", optional = true }
lightning = { path = "../lightning", features = ["regex"] } lightning = { path = "../lightning", features = ["regex"] }
lightning-rapid-gossip-sync = { path = "../lightning-rapid-gossip-sync" }
bitcoin = { version = "0.28.1", features = ["secp-lowmemory"] } bitcoin = { version = "0.28.1", features = ["secp-lowmemory"] }
hex = "0.3" hex = "0.3"
honggfuzz = { version = "0.5", optional = true } honggfuzz = { version = "0.5", optional = true }

View File

@ -10,6 +10,7 @@ GEN_TEST chanmon_deser
GEN_TEST chanmon_consistency GEN_TEST chanmon_consistency
GEN_TEST full_stack GEN_TEST full_stack
GEN_TEST peer_crypt GEN_TEST peer_crypt
GEN_TEST process_network_graph
GEN_TEST router GEN_TEST router
GEN_TEST zbase32 GEN_TEST zbase32

View File

@ -0,0 +1,113 @@
// This file is Copyright its original authors, visible in version control
// history.
//
// This file is licensed under the Apache License, Version 2.0 <LICENSE-APACHE
// or http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your option.
// You may not use this file except in accordance with one or both of these
// licenses.
// This file is auto-generated by gen_target.sh based on target_template.txt
// To modify it, modify target_template.txt and run gen_target.sh instead.
#![cfg_attr(feature = "libfuzzer_fuzz", no_main)]
#[cfg(not(fuzzing))]
compile_error!("Fuzz targets need cfg=fuzzing");
extern crate lightning_fuzz;
use lightning_fuzz::process_network_graph::*;
#[cfg(feature = "afl")]
#[macro_use] extern crate afl;
#[cfg(feature = "afl")]
fn main() {
fuzz!(|data| {
process_network_graph_run(data.as_ptr(), data.len());
});
}
#[cfg(feature = "honggfuzz")]
#[macro_use] extern crate honggfuzz;
#[cfg(feature = "honggfuzz")]
fn main() {
loop {
fuzz!(|data| {
process_network_graph_run(data.as_ptr(), data.len());
});
}
}
#[cfg(feature = "libfuzzer_fuzz")]
#[macro_use] extern crate libfuzzer_sys;
#[cfg(feature = "libfuzzer_fuzz")]
fuzz_target!(|data: &[u8]| {
process_network_graph_run(data.as_ptr(), data.len());
});
#[cfg(feature = "stdin_fuzz")]
fn main() {
use std::io::Read;
let mut data = Vec::with_capacity(8192);
std::io::stdin().read_to_end(&mut data).unwrap();
process_network_graph_run(data.as_ptr(), data.len());
}
#[test]
fn run_test_cases() {
use std::fs;
use std::io::Read;
use lightning_fuzz::utils::test_logger::StringBuffer;
use std::sync::{atomic, Arc};
{
let data: Vec<u8> = vec![0];
process_network_graph_run(data.as_ptr(), data.len());
}
let mut threads = Vec::new();
let threads_running = Arc::new(atomic::AtomicUsize::new(0));
if let Ok(tests) = fs::read_dir("test_cases/process_network_graph") {
for test in tests {
let mut data: Vec<u8> = Vec::new();
let path = test.unwrap().path();
fs::File::open(&path).unwrap().read_to_end(&mut data).unwrap();
threads_running.fetch_add(1, atomic::Ordering::AcqRel);
let thread_count_ref = Arc::clone(&threads_running);
let main_thread_ref = std::thread::current();
threads.push((path.file_name().unwrap().to_str().unwrap().to_string(),
std::thread::spawn(move || {
let string_logger = StringBuffer::new();
let panic_logger = string_logger.clone();
let res = if ::std::panic::catch_unwind(move || {
process_network_graph_test(&data, panic_logger);
}).is_err() {
Some(string_logger.into_string())
} else { None };
thread_count_ref.fetch_sub(1, atomic::Ordering::AcqRel);
main_thread_ref.unpark();
res
})
));
while threads_running.load(atomic::Ordering::Acquire) > 32 {
std::thread::park();
}
}
}
let mut failed_outputs = Vec::new();
for (test, thread) in threads.drain(..) {
if let Some(output) = thread.join().unwrap() {
println!("\nOutput of {}:\n{}\n", test, output);
failed_outputs.push(test);
}
}
if !failed_outputs.is_empty() {
println!("Test cases which failed: ");
for case in failed_outputs {
println!("{}", case);
}
panic!();
}
}

View File

@ -9,6 +9,7 @@
extern crate bitcoin; extern crate bitcoin;
extern crate lightning; extern crate lightning;
extern crate lightning_rapid_gossip_sync;
extern crate hex; extern crate hex;
pub mod utils; pub mod utils;
@ -17,6 +18,7 @@ pub mod chanmon_deser;
pub mod chanmon_consistency; pub mod chanmon_consistency;
pub mod full_stack; pub mod full_stack;
pub mod peer_crypt; pub mod peer_crypt;
pub mod process_network_graph;
pub mod router; pub mod router;
pub mod zbase32; pub mod zbase32;

View File

@ -0,0 +1,20 @@
// Import that needs to be added manually
use utils::test_logger;
/// Actual fuzz test, method signature and name are fixed
fn do_test(data: &[u8]) {
let block_hash = bitcoin::BlockHash::default();
let network_graph = lightning::routing::network_graph::NetworkGraph::new(block_hash);
lightning_rapid_gossip_sync::processing::update_network_graph(&network_graph, data);
}
/// Method that needs to be added manually, {name}_test
pub fn process_network_graph_test<Out: test_logger::Output>(data: &[u8], _out: Out) {
do_test(data);
}
/// Method that needs to be added manually, {name}_run
#[no_mangle]
pub extern "C" fn process_network_graph_run(data: *const u8, datalen: usize) {
do_test(unsafe { std::slice::from_raw_parts(data, datalen) });
}

View File

@ -3,6 +3,7 @@ void chanmon_deser_run(const unsigned char* data, size_t data_len);
void chanmon_consistency_run(const unsigned char* data, size_t data_len); void chanmon_consistency_run(const unsigned char* data, size_t data_len);
void full_stack_run(const unsigned char* data, size_t data_len); void full_stack_run(const unsigned char* data, size_t data_len);
void peer_crypt_run(const unsigned char* data, size_t data_len); void peer_crypt_run(const unsigned char* data, size_t data_len);
void process_network_graph_run(const unsigned char* data, size_t data_len);
void router_run(const unsigned char* data, size_t data_len); void router_run(const unsigned char* data, size_t data_len);
void zbase32_run(const unsigned char* data, size_t data_len); void zbase32_run(const unsigned char* data, size_t data_len);
void msg_accept_channel_run(const unsigned char* data, size_t data_len); void msg_accept_channel_run(const unsigned char* data, size_t data_len);

View File

@ -0,0 +1,20 @@
[package]
name = "lightning-rapid-gossip-sync"
version = "0.0.104"
authors = ["Arik Sosman <git@arik.io>"]
license = "MIT OR Apache-2.0"
repository = "https://github.com/lightningdevkit/rust-lightning"
edition = "2018"
description = """
Utility to process gossip routing data from Rapid Gossip Sync Server.
"""
[features]
_bench_unstable = []
[dependencies]
lightning = { version = "0.0.106", path = "../lightning" }
bitcoin = { version = "0.28.1", default-features = false }
[dev-dependencies]
lightning = { version = "0.0.106", path = "../lightning", features = ["_test_utils"] }

View File

@ -0,0 +1,120 @@
# lightning-rapid-gossip-sync
This crate exposes functionality for rapid gossip graph syncing, aimed primarily at mobile clients.
Its server counterpart is the
[rapid-gossip-sync-server](https://github.com/lightningdevkit/rapid-gossip-sync-server) repository.
## Mechanism
The (presumed) server sends a compressed gossip response containing gossip data. The gossip data is
formatted compactly, omitting signatures and opportunistically incremental where previous channel
updates are known.
Essentially, the serialization structure is as follows:
1. Fixed prefix bytes `76, 68, 75, 1` (the first three bytes are ASCII for `LDK`)
- The purpose of this prefix is to identify the serialization format, should other rapid gossip
sync formats arise in the future
- The fourth byte is the protocol version in case our format gets updated
2. Chain hash (32 bytes)
3. Latest seen timestamp (`u32`)
4. An unsigned int indicating the number of node IDs to follow
5. An array of compressed node ID pubkeys (all pubkeys are presumed to be standard
compressed 33-byte-serializations)
6. An unsigned int indicating the number of channel announcement messages to follow
7. An array of significantly stripped down customized channel announcements
8. An unsigned int indicating the number of channel update messages to follow
9. A series of default values used for non-incremental channel updates
- The values are defined as follows:
1. `default_cltv_expiry_delta`
2. `default_htlc_minimum_msat`
3. `default_fee_base_msat`
4. `default_fee_proportional_millionths`
5. `default_htlc_maximum_msat` (`u64`, and if the default is no maximum, `u64::MAX`)
- The defaults are calculated by the server based on the frequency among non-incremental
updates within a given delta set
10. An array of customized channel updates
You will also notice that `NodeAnnouncement` messages are omitted altogether as the node IDs are
implicitly extracted from the channel announcements and updates.
The data is then applied to the current network graph, artificially dated to the timestamp of the
latest seen message less one week, be it an announcement or an update, from the server's
perspective. The network graph should not be pruned until the graph sync completes.
### Custom Channel Announcement
To achieve compactness and avoid data repetition, we're sending a significantly stripped down
version of the channel announcement message, which contains only the following data:
1. `channel_features`: `u16` + `n`, where `n` is the number of bytes indicated by the first `u16`
2. `short_channel_id`: `CompactSize` (incremental `CompactSize` deltas starting from 0)
3. `node_id_1_index`: `CompactSize` (index of node id within the previously sent sequence)
4. `node_id_2_index`: `CompactSize` (index of node id within the previously sent sequence)
### Custom Channel Update
For the purpose of rapid syncing, we have deviated from the channel update format specified in
BOLT 7 significantly. Our custom channel updates are structured as follows:
1. `short_channel_id`: `CompactSize` (incremental `CompactSize` deltas starting at 0)
2. `custom_channel_flags`: `u8`
3. `update_data`
Specifically, our custom channel flags break down like this:
| 128 | 64 | 32 | 16 | 8 | 4 | 2 | 1 |
|---------------------|----|----|----|---|---|------------------|-----------|
| Incremental update? | | | | | | Disable channel? | Direction |
If the most significant bit is set to `1`, indicating an incremental update, the intermediate bit
flags assume the following meaning:
| 64 | 32 | 16 | 8 | 4 |
|---------------------------------|---------------------------------|-----------------------------|-------------------------------------------|---------------------------------|
| `cltv_expiry_delta` has changed | `htlc_minimum_msat` has changed | `fee_base_msat` has changed | `fee_proportional_millionths` has changed | `htlc_maximum_msat` has changed |
If the most significant bit is set to `0`, the meaning is almost identical, except instead of a
change, the flags now represent a deviation from the defaults sent at the beginning of the update
sequence.
In both cases, `update_data` only contains the fields that are indicated by the channel flags to be
non-default or to have mutated.
## Delta Calculation
The way a server is meant to calculate this rapid gossip sync data is by taking the latest time
any change, be it either an announcement or an update, was seen. That timestamp is included in each
rapid sync message, so all the client needs to do is cache one variable.
If a particular channel update had never occurred before, the full update is sent. If a channel has
had updates prior to the provided timestamp, the latest update prior to the timestamp is taken as a
reference, and the delta is calculated against it.
Depending on whether the rapid sync message is calculated on the fly or a snapshotted version is
returned, intermediate changes between the latest update seen by the client and the latest update
broadcast on the network may be taken into account when calculating the delta.
## Performance
Given the primary purpose of this utility is a faster graph sync, we thought it might be helpful to
provide some examples of various delta sets. These examples were calculated as of May 19th 2022
with a network graph comprised of 80,000 channel announcements and 160,000 directed channel updates.
| Full sync | |
|-----------------------------|--------|
| Message Length | 4.7 MB |
| Gzipped Message Length | 2.0 MB |
| Client-side Processing Time | 1.4 s |
| Week-old sync | |
|-----------------------------|--------|
| Message Length | 2.7 MB |
| Gzipped Message Length | 862 kB |
| Client-side Processing Time | 907 ms |
| Day-old sync | |
|-----------------------------|---------|
| Message Length | 191 kB |
| Gzipped Message Length | 92.8 kB |
| Client-side Processing Time | 196 ms |

View File

@ -0,0 +1,2 @@
*
!.gitignore

View File

@ -0,0 +1,40 @@
use core::fmt::Debug;
use std::fmt::Formatter;
use lightning::ln::msgs::{DecodeError, LightningError};
/// All-encompassing standard error type that processing can return
pub enum GraphSyncError {
/// Error trying to read the update data, typically due to an erroneous data length indication
/// that is greater than the actual amount of data provided
DecodeError(DecodeError),
/// Error applying the patch to the network graph, usually the result of updates that are too
/// old or missing prerequisite data to the application of updates out of order
LightningError(LightningError),
}
impl From<std::io::Error> for GraphSyncError {
fn from(error: std::io::Error) -> Self {
Self::DecodeError(DecodeError::Io(error.kind()))
}
}
impl From<DecodeError> for GraphSyncError {
fn from(error: DecodeError) -> Self {
Self::DecodeError(error)
}
}
impl From<LightningError> for GraphSyncError {
fn from(error: LightningError) -> Self {
Self::LightningError(error)
}
}
impl Debug for GraphSyncError {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
GraphSyncError::DecodeError(e) => f.write_fmt(format_args!("DecodeError: {:?}", e)),
GraphSyncError::LightningError(e) => f.write_fmt(format_args!("LightningError: {:?}", e))
}
}
}

View File

@ -0,0 +1,243 @@
#![deny(missing_docs)]
#![deny(unsafe_code)]
#![deny(broken_intra_doc_links)]
#![deny(non_upper_case_globals)]
#![deny(non_camel_case_types)]
#![deny(non_snake_case)]
#![deny(unused_mut)]
#![deny(unused_variables)]
#![deny(unused_imports)]
//! This crate exposes functionality to rapidly sync gossip data, aimed primarily at mobile
//! devices.
//!
//! The server sends a compressed response containing differential gossip data. The gossip data is
//! formatted compactly, omitting signatures and opportunistically incremental where previous
//! channel updates are known (a mechanism that is enabled when the timestamp of the last known
//! channel update is communicated). A reference server implementation can be found
//! [here](https://github.com/lightningdevkit/rapid-gossip-sync-server).
//!
//! An example server request could look as simple as the following. Note that the first ever rapid
//! sync should use `0` for `last_sync_timestamp`:
//!
//! ```shell
//! curl -o rapid_sync.lngossip https://rapidsync.lightningdevkit.org/snapshot/<last_sync_timestamp>
//! ```
//!
//! Then, call the network processing function. In this example, we process the update by reading
//! its contents from disk, which we do by calling the `sync_network_graph_with_file_path` method:
//!
//! ```
//! use bitcoin::blockdata::constants::genesis_block;
//! use bitcoin::Network;
//! use lightning::routing::network_graph::NetworkGraph;
//!
//! let block_hash = genesis_block(Network::Bitcoin).header.block_hash();
//! let network_graph = NetworkGraph::new(block_hash);
//! let new_last_sync_timestamp_result = lightning_rapid_gossip_sync::sync_network_graph_with_file_path(&network_graph, "./rapid_sync.lngossip");
//! ```
//!
//! The primary benefit this syncing mechanism provides is that given a trusted server, a
//! low-powered client can offload the validation of gossip signatures. This enables a client to
//! privately calculate routes for payments, and do so much faster and earlier than requiring a full
//! peer-to-peer gossip sync to complete.
//!
//! The reason the rapid sync server requires trust is that it could provide bogus data, though at
//! worst, all that would result in is a fake network topology, which wouldn't enable the server to
//! steal or siphon off funds. It could, however, reduce the client's privacy by forcing all
//! payments to be routed via channels the server controls.
//!
//! The way a server is meant to calculate this rapid gossip sync data is by using a `latest_seen`
//! timestamp provided by the client. It's not included in either channel announcement or update,
//! (not least due to announcements not including any timestamps at all, but only a block height)
//! but rather, it's a timestamp of when the server saw a particular message.
// Allow and import test features for benching
#![cfg_attr(all(test, feature = "_bench_unstable"), feature(test))]
#[cfg(all(test, feature = "_bench_unstable"))]
extern crate test;
use std::fs::File;
use lightning::routing::network_graph;
use crate::error::GraphSyncError;
/// Error types that these functions can return
pub mod error;
/// Core functionality of this crate
pub mod processing;
/// Sync gossip data from a file
/// Returns the last sync timestamp to be used the next time rapid sync data is queried.
///
/// `network_graph`: The network graph to apply the updates to
///
/// `sync_path`: Path to the file where the gossip update data is located
///
pub fn sync_network_graph_with_file_path(
network_graph: &network_graph::NetworkGraph,
sync_path: &str,
) -> Result<u32, GraphSyncError> {
let mut file = File::open(sync_path)?;
processing::update_network_graph_from_byte_stream(&network_graph, &mut file)
}
#[cfg(test)]
mod tests {
use std::fs;
use bitcoin::blockdata::constants::genesis_block;
use bitcoin::Network;
use lightning::ln::msgs::DecodeError;
use lightning::routing::network_graph::NetworkGraph;
use crate::sync_network_graph_with_file_path;
#[test]
fn test_sync_from_file() {
struct FileSyncTest {
directory: String,
}
impl FileSyncTest {
fn new(tmp_directory: &str, valid_response: &[u8]) -> FileSyncTest {
let test = FileSyncTest { directory: tmp_directory.to_owned() };
let graph_sync_test_directory = test.get_test_directory();
fs::create_dir_all(graph_sync_test_directory).unwrap();
let graph_sync_test_file = test.get_test_file_path();
fs::write(&graph_sync_test_file, valid_response).unwrap();
test
}
fn get_test_directory(&self) -> String {
let graph_sync_test_directory = self.directory.clone() + "/graph-sync-tests";
graph_sync_test_directory
}
fn get_test_file_path(&self) -> String {
let graph_sync_test_directory = self.get_test_directory();
let graph_sync_test_file = graph_sync_test_directory.to_owned() + "/test_data.lngossip";
graph_sync_test_file
}
}
impl Drop for FileSyncTest {
fn drop(&mut self) {
fs::remove_dir_all(self.directory.clone()).unwrap();
}
}
// same as incremental_only_update_fails_without_prior_same_direction_updates
let valid_response = vec![
76, 68, 75, 1, 111, 226, 140, 10, 182, 241, 179, 114, 193, 166, 162, 70, 174, 99, 247,
79, 147, 30, 131, 101, 225, 90, 8, 156, 104, 214, 25, 0, 0, 0, 0, 0, 97, 227, 98, 218,
0, 0, 0, 4, 2, 22, 7, 207, 206, 25, 164, 197, 231, 230, 231, 56, 102, 61, 250, 251,
187, 172, 38, 46, 79, 247, 108, 44, 155, 48, 219, 238, 252, 53, 192, 6, 67, 2, 36, 125,
157, 176, 223, 175, 234, 116, 94, 248, 201, 225, 97, 235, 50, 47, 115, 172, 63, 136,
88, 216, 115, 11, 111, 217, 114, 84, 116, 124, 231, 107, 2, 158, 1, 242, 121, 152, 106,
204, 131, 186, 35, 93, 70, 216, 10, 237, 224, 183, 89, 95, 65, 3, 83, 185, 58, 138,
181, 64, 187, 103, 127, 68, 50, 2, 201, 19, 17, 138, 136, 149, 185, 226, 156, 137, 175,
110, 32, 237, 0, 217, 90, 31, 100, 228, 149, 46, 219, 175, 168, 77, 4, 143, 38, 128,
76, 97, 0, 0, 0, 2, 0, 0, 255, 8, 153, 192, 0, 2, 27, 0, 0, 0, 1, 0, 0, 255, 2, 68,
226, 0, 6, 11, 0, 1, 2, 3, 0, 0, 0, 2, 0, 40, 0, 0, 0, 0, 0, 0, 3, 232, 0, 0, 3, 232,
0, 0, 0, 1, 0, 0, 0, 0, 58, 85, 116, 216, 255, 8, 153, 192, 0, 2, 27, 0, 0, 25, 0, 0,
0, 1, 0, 0, 0, 125, 255, 2, 68, 226, 0, 6, 11, 0, 1, 5, 0, 0, 0, 0, 29, 129, 25, 192,
];
let tmp_directory = "./rapid-gossip-sync-tests-tmp";
let sync_test = FileSyncTest::new(tmp_directory, &valid_response);
let graph_sync_test_file = sync_test.get_test_file_path();
let block_hash = genesis_block(Network::Bitcoin).block_hash();
let network_graph = NetworkGraph::new(block_hash);
assert_eq!(network_graph.read_only().channels().len(), 0);
let sync_result = sync_network_graph_with_file_path(&network_graph, &graph_sync_test_file);
if sync_result.is_err() {
panic!("Unexpected sync result: {:?}", sync_result)
}
assert_eq!(network_graph.read_only().channels().len(), 2);
let after = network_graph.to_string();
assert!(
after.contains("021607cfce19a4c5e7e6e738663dfafbbbac262e4ff76c2c9b30dbeefc35c00643")
);
assert!(
after.contains("02247d9db0dfafea745ef8c9e161eb322f73ac3f8858d8730b6fd97254747ce76b")
);
assert!(
after.contains("029e01f279986acc83ba235d46d80aede0b7595f410353b93a8ab540bb677f4432")
);
assert!(
after.contains("02c913118a8895b9e29c89af6e20ed00d95a1f64e4952edbafa84d048f26804c61")
);
assert!(after.contains("619737530008010752"));
assert!(after.contains("783241506229452801"));
}
#[test]
fn measure_native_read_from_file() {
let block_hash = genesis_block(Network::Bitcoin).block_hash();
let network_graph = NetworkGraph::new(block_hash);
assert_eq!(network_graph.read_only().channels().len(), 0);
let start = std::time::Instant::now();
let sync_result =
sync_network_graph_with_file_path(&network_graph, "./res/full_graph.lngossip");
if let Err(crate::error::GraphSyncError::DecodeError(DecodeError::Io(io_error))) = &sync_result {
let error_string = format!("Input file lightning-graph-sync/res/full_graph.lngossip is missing! Download it from https://bitcoin.ninja/ldk-compressed_graph-bc08df7542-2022-05-05.bin\n\n{:?}", io_error);
#[cfg(not(require_route_graph_test))]
{
println!("{}", error_string);
return;
}
panic!("{}", error_string);
}
let elapsed = start.elapsed();
println!("initialization duration: {:?}", elapsed);
if sync_result.is_err() {
panic!("Unexpected sync result: {:?}", sync_result)
}
}
}
#[cfg(all(test, feature = "_bench_unstable"))]
pub mod bench {
use test::Bencher;
use bitcoin::blockdata::constants::genesis_block;
use bitcoin::Network;
use lightning::ln::msgs::DecodeError;
use lightning::routing::network_graph::NetworkGraph;
use crate::sync_network_graph_with_file_path;
#[bench]
fn bench_reading_full_graph_from_file(b: &mut Bencher) {
let block_hash = genesis_block(Network::Bitcoin).block_hash();
b.iter(|| {
let network_graph = NetworkGraph::new(block_hash);
let sync_result = sync_network_graph_with_file_path(
&network_graph,
"./res/full_graph.lngossip",
);
if let Err(crate::error::GraphSyncError::DecodeError(DecodeError::Io(io_error))) = &sync_result {
let error_string = format!("Input file lightning-graph-sync/res/full_graph.lngossip is missing! Download it from https://bitcoin.ninja/ldk-compressed_graph-bc08df7542-2022-05-05.bin\n\n{:?}", io_error);
#[cfg(not(require_route_graph_test))]
{
println!("{}", error_string);
return;
}
panic!("{}", error_string);
}
assert!(sync_result.is_ok())
});
}
}

View File

@ -0,0 +1,499 @@
use std::cmp::max;
use std::io;
use std::io::Read;
use bitcoin::BlockHash;
use bitcoin::secp256k1::PublicKey;
use lightning::ln::msgs::{
DecodeError, ErrorAction, LightningError, OptionalField, UnsignedChannelUpdate,
};
use lightning::routing::network_graph;
use lightning::util::ser::{BigSize, Readable};
use crate::error::GraphSyncError;
/// The purpose of this prefix is to identify the serialization format, should other rapid gossip
/// sync formats arise in the future.
///
/// The fourth byte is the protocol version in case our format gets updated.
const GOSSIP_PREFIX: [u8; 4] = [76, 68, 75, 1];
/// Maximum vector allocation capacity for distinct node IDs. This constraint is necessary to
/// avoid malicious updates being able to trigger excessive memory allocation.
const MAX_INITIAL_NODE_ID_VECTOR_CAPACITY: u32 = 50_000;
/// Update network graph from binary data.
/// Returns the last sync timestamp to be used the next time rapid sync data is queried.
///
/// `network_graph`: network graph to be updated
///
/// `update_data`: `&[u8]` binary stream that comprises the update data
pub fn update_network_graph(
network_graph: &network_graph::NetworkGraph,
update_data: &[u8],
) -> Result<u32, GraphSyncError> {
let mut read_cursor = io::Cursor::new(update_data);
update_network_graph_from_byte_stream(&network_graph, &mut read_cursor)
}
pub(crate) fn update_network_graph_from_byte_stream<R: Read>(
network_graph: &network_graph::NetworkGraph,
mut read_cursor: &mut R,
) -> Result<u32, GraphSyncError> {
let mut prefix = [0u8; 4];
read_cursor.read_exact(&mut prefix)?;
match prefix {
GOSSIP_PREFIX => {},
_ => {
return Err(DecodeError::UnknownVersion.into());
}
};
let chain_hash: BlockHash = Readable::read(read_cursor)?;
let latest_seen_timestamp: u32 = Readable::read(read_cursor)?;
// backdate the applied timestamp by a week
let backdated_timestamp = latest_seen_timestamp.saturating_sub(24 * 3600 * 7);
let node_id_count: u32 = Readable::read(read_cursor)?;
let mut node_ids: Vec<PublicKey> = Vec::with_capacity(std::cmp::min(
node_id_count,
MAX_INITIAL_NODE_ID_VECTOR_CAPACITY,
) as usize);
for _ in 0..node_id_count {
let current_node_id = Readable::read(read_cursor)?;
node_ids.push(current_node_id);
}
let mut previous_scid: u64 = 0;
let announcement_count: u32 = Readable::read(read_cursor)?;
for _ in 0..announcement_count {
let features = Readable::read(read_cursor)?;
// handle SCID
let scid_delta: BigSize = Readable::read(read_cursor)?;
let short_channel_id = previous_scid
.checked_add(scid_delta.0)
.ok_or(DecodeError::InvalidValue)?;
previous_scid = short_channel_id;
let node_id_1_index: BigSize = Readable::read(read_cursor)?;
let node_id_2_index: BigSize = Readable::read(read_cursor)?;
if max(node_id_1_index.0, node_id_2_index.0) >= node_id_count as u64 {
return Err(DecodeError::InvalidValue.into());
};
let node_id_1 = node_ids[node_id_1_index.0 as usize];
let node_id_2 = node_ids[node_id_2_index.0 as usize];
let announcement_result = network_graph.add_channel_from_partial_announcement(
short_channel_id,
backdated_timestamp as u64,
features,
node_id_1,
node_id_2,
);
if let Err(lightning_error) = announcement_result {
if let ErrorAction::IgnoreDuplicateGossip = lightning_error.action {
// everything is fine, just a duplicate channel announcement
} else {
return Err(lightning_error.into());
}
}
}
previous_scid = 0; // updates start at a new scid
let update_count: u32 = Readable::read(read_cursor)?;
if update_count == 0 {
return Ok(latest_seen_timestamp);
}
// obtain default values for non-incremental updates
let default_cltv_expiry_delta: u16 = Readable::read(&mut read_cursor)?;
let default_htlc_minimum_msat: u64 = Readable::read(&mut read_cursor)?;
let default_fee_base_msat: u32 = Readable::read(&mut read_cursor)?;
let default_fee_proportional_millionths: u32 = Readable::read(&mut read_cursor)?;
let tentative_default_htlc_maximum_msat: u64 = Readable::read(&mut read_cursor)?;
let default_htlc_maximum_msat = if tentative_default_htlc_maximum_msat == u64::max_value() {
OptionalField::Absent
} else {
OptionalField::Present(tentative_default_htlc_maximum_msat)
};
for _ in 0..update_count {
let scid_delta: BigSize = Readable::read(read_cursor)?;
let short_channel_id = previous_scid
.checked_add(scid_delta.0)
.ok_or(DecodeError::InvalidValue)?;
previous_scid = short_channel_id;
let channel_flags: u8 = Readable::read(read_cursor)?;
// flags are always sent in full, and hence always need updating
let standard_channel_flags = channel_flags & 0b_0000_0011;
let mut synthetic_update = if channel_flags & 0b_1000_0000 == 0 {
// full update, field flags will indicate deviations from the default
UnsignedChannelUpdate {
chain_hash,
short_channel_id,
timestamp: backdated_timestamp,
flags: standard_channel_flags,
cltv_expiry_delta: default_cltv_expiry_delta,
htlc_minimum_msat: default_htlc_minimum_msat,
htlc_maximum_msat: default_htlc_maximum_msat.clone(),
fee_base_msat: default_fee_base_msat,
fee_proportional_millionths: default_fee_proportional_millionths,
excess_data: vec![],
}
} else {
// incremental update, field flags will indicate mutated values
let read_only_network_graph = network_graph.read_only();
let channel = read_only_network_graph
.channels()
.get(&short_channel_id)
.ok_or(LightningError {
err: "Couldn't find channel for update".to_owned(),
action: ErrorAction::IgnoreError,
})?;
let directional_info = channel
.get_directional_info(channel_flags)
.ok_or(LightningError {
err: "Couldn't find previous directional data for update".to_owned(),
action: ErrorAction::IgnoreError,
})?;
let htlc_maximum_msat =
if let Some(htlc_maximum_msat) = directional_info.htlc_maximum_msat {
OptionalField::Present(htlc_maximum_msat)
} else {
OptionalField::Absent
};
UnsignedChannelUpdate {
chain_hash,
short_channel_id,
timestamp: backdated_timestamp,
flags: standard_channel_flags,
cltv_expiry_delta: directional_info.cltv_expiry_delta,
htlc_minimum_msat: directional_info.htlc_minimum_msat,
htlc_maximum_msat,
fee_base_msat: directional_info.fees.base_msat,
fee_proportional_millionths: directional_info.fees.proportional_millionths,
excess_data: vec![],
}
};
if channel_flags & 0b_0100_0000 > 0 {
let cltv_expiry_delta: u16 = Readable::read(read_cursor)?;
synthetic_update.cltv_expiry_delta = cltv_expiry_delta;
}
if channel_flags & 0b_0010_0000 > 0 {
let htlc_minimum_msat: u64 = Readable::read(read_cursor)?;
synthetic_update.htlc_minimum_msat = htlc_minimum_msat;
}
if channel_flags & 0b_0001_0000 > 0 {
let fee_base_msat: u32 = Readable::read(read_cursor)?;
synthetic_update.fee_base_msat = fee_base_msat;
}
if channel_flags & 0b_0000_1000 > 0 {
let fee_proportional_millionths: u32 = Readable::read(read_cursor)?;
synthetic_update.fee_proportional_millionths = fee_proportional_millionths;
}
if channel_flags & 0b_0000_0100 > 0 {
let tentative_htlc_maximum_msat: u64 = Readable::read(read_cursor)?;
synthetic_update.htlc_maximum_msat = if tentative_htlc_maximum_msat == u64::max_value()
{
OptionalField::Absent
} else {
OptionalField::Present(tentative_htlc_maximum_msat)
};
}
network_graph.update_channel_unsigned(&synthetic_update)?;
}
Ok(latest_seen_timestamp)
}
#[cfg(test)]
mod tests {
use bitcoin::blockdata::constants::genesis_block;
use bitcoin::Network;
use lightning::ln::msgs::DecodeError;
use lightning::routing::network_graph::NetworkGraph;
use crate::error::GraphSyncError;
use crate::processing::update_network_graph;
#[test]
fn network_graph_fails_to_update_from_clipped_input() {
let block_hash = genesis_block(Network::Bitcoin).block_hash();
let network_graph = NetworkGraph::new(block_hash);
let example_input = vec![
76, 68, 75, 1, 111, 226, 140, 10, 182, 241, 179, 114, 193, 166, 162, 70, 174, 99, 247,
79, 147, 30, 131, 101, 225, 90, 8, 156, 104, 214, 25, 0, 0, 0, 0, 0, 97, 227, 98, 218,
0, 0, 0, 4, 2, 22, 7, 207, 206, 25, 164, 197, 231, 230, 231, 56, 102, 61, 250, 251,
187, 172, 38, 46, 79, 247, 108, 44, 155, 48, 219, 238, 252, 53, 192, 6, 67, 2, 36, 125,
157, 176, 223, 175, 234, 116, 94, 248, 201, 225, 97, 235, 50, 47, 115, 172, 63, 136,
88, 216, 115, 11, 111, 217, 114, 84, 116, 124, 231, 107, 2, 158, 1, 242, 121, 152, 106,
204, 131, 186, 35, 93, 70, 216, 10, 237, 224, 183, 89, 95, 65, 3, 83, 185, 58, 138,
181, 64, 187, 103, 127, 68, 50, 2, 201, 19, 17, 138, 136, 149, 185, 226, 156, 137, 175,
110, 32, 237, 0, 217, 90, 31, 100, 228, 149, 46, 219, 175, 168, 77, 4, 143, 38, 128,
76, 97, 0, 0, 0, 2, 0, 0, 255, 8, 153, 192, 0, 2, 27, 0, 0, 0, 1, 0, 0, 255, 2, 68,
226, 0, 6, 11, 0, 1, 2, 3, 0, 0, 0, 2, 0, 40, 0, 0, 0, 0, 0, 0, 3, 232, 0, 0, 0, 100,
0, 0, 2, 224, 0, 0, 0, 0, 29, 129, 25, 192, 255, 8, 153, 192, 0, 2, 27, 0, 0, 36, 0, 0,
0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 58, 85, 116, 216, 255, 2, 68, 226, 0, 6, 11, 0, 1, 24, 0,
0, 3, 232, 0, 0, 0,
];
let update_result = update_network_graph(&network_graph, &example_input[..]);
assert!(update_result.is_err());
if let Err(GraphSyncError::DecodeError(DecodeError::ShortRead)) = update_result {
// this is the expected error type
} else {
panic!("Unexpected update result: {:?}", update_result)
}
}
#[test]
fn incremental_only_update_fails_without_prior_announcements() {
let incremental_update_input = vec![
76, 68, 75, 1, 111, 226, 140, 10, 182, 241, 179, 114, 193, 166, 162, 70, 174, 99, 247,
79, 147, 30, 131, 101, 225, 90, 8, 156, 104, 214, 25, 0, 0, 0, 0, 0, 97, 229, 183, 167,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 255, 8, 153, 192, 0, 2, 27, 0, 0, 136, 0, 0, 0, 221, 255, 2,
68, 226, 0, 6, 11, 0, 1, 128,
];
let block_hash = genesis_block(Network::Bitcoin).block_hash();
let network_graph = NetworkGraph::new(block_hash);
assert_eq!(network_graph.read_only().channels().len(), 0);
let update_result = update_network_graph(&network_graph, &incremental_update_input[..]);
assert!(update_result.is_err());
if let Err(GraphSyncError::LightningError(lightning_error)) = update_result {
assert_eq!(lightning_error.err, "Couldn't find channel for update");
} else {
panic!("Unexpected update result: {:?}", update_result)
}
}
#[test]
fn incremental_only_update_fails_without_prior_updates() {
let announced_update_input = vec![
76, 68, 75, 1, 111, 226, 140, 10, 182, 241, 179, 114, 193, 166, 162, 70, 174, 99, 247,
79, 147, 30, 131, 101, 225, 90, 8, 156, 104, 214, 25, 0, 0, 0, 0, 0, 97, 229, 183, 167,
0, 0, 0, 4, 2, 22, 7, 207, 206, 25, 164, 197, 231, 230, 231, 56, 102, 61, 250, 251,
187, 172, 38, 46, 79, 247, 108, 44, 155, 48, 219, 238, 252, 53, 192, 6, 67, 2, 36, 125,
157, 176, 223, 175, 234, 116, 94, 248, 201, 225, 97, 235, 50, 47, 115, 172, 63, 136,
88, 216, 115, 11, 111, 217, 114, 84, 116, 124, 231, 107, 2, 158, 1, 242, 121, 152, 106,
204, 131, 186, 35, 93, 70, 216, 10, 237, 224, 183, 89, 95, 65, 3, 83, 185, 58, 138,
181, 64, 187, 103, 127, 68, 50, 2, 201, 19, 17, 138, 136, 149, 185, 226, 156, 137, 175,
110, 32, 237, 0, 217, 90, 31, 100, 228, 149, 46, 219, 175, 168, 77, 4, 143, 38, 128,
76, 97, 0, 0, 0, 2, 0, 0, 255, 8, 153, 192, 0, 2, 27, 0, 0, 0, 1, 0, 0, 255, 2, 68,
226, 0, 6, 11, 0, 1, 2, 3, 0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 255, 8, 153, 192, 0, 2, 27, 0, 0, 136, 0, 0, 0, 221, 255,
2, 68, 226, 0, 6, 11, 0, 1, 128,
];
let block_hash = genesis_block(Network::Bitcoin).block_hash();
let network_graph = NetworkGraph::new(block_hash);
assert_eq!(network_graph.read_only().channels().len(), 0);
let update_result = update_network_graph(&network_graph, &announced_update_input[..]);
assert!(update_result.is_err());
if let Err(GraphSyncError::LightningError(lightning_error)) = update_result {
assert_eq!(
lightning_error.err,
"Couldn't find previous directional data for update"
);
} else {
panic!("Unexpected update result: {:?}", update_result)
}
}
#[test]
fn incremental_only_update_fails_without_prior_same_direction_updates() {
let initialization_input = vec![
76, 68, 75, 1, 111, 226, 140, 10, 182, 241, 179, 114, 193, 166, 162, 70, 174, 99, 247,
79, 147, 30, 131, 101, 225, 90, 8, 156, 104, 214, 25, 0, 0, 0, 0, 0, 97, 227, 98, 218,
0, 0, 0, 4, 2, 22, 7, 207, 206, 25, 164, 197, 231, 230, 231, 56, 102, 61, 250, 251,
187, 172, 38, 46, 79, 247, 108, 44, 155, 48, 219, 238, 252, 53, 192, 6, 67, 2, 36, 125,
157, 176, 223, 175, 234, 116, 94, 248, 201, 225, 97, 235, 50, 47, 115, 172, 63, 136,
88, 216, 115, 11, 111, 217, 114, 84, 116, 124, 231, 107, 2, 158, 1, 242, 121, 152, 106,
204, 131, 186, 35, 93, 70, 216, 10, 237, 224, 183, 89, 95, 65, 3, 83, 185, 58, 138,
181, 64, 187, 103, 127, 68, 50, 2, 201, 19, 17, 138, 136, 149, 185, 226, 156, 137, 175,
110, 32, 237, 0, 217, 90, 31, 100, 228, 149, 46, 219, 175, 168, 77, 4, 143, 38, 128,
76, 97, 0, 0, 0, 2, 0, 0, 255, 8, 153, 192, 0, 2, 27, 0, 0, 0, 1, 0, 0, 255, 2, 68,
226, 0, 6, 11, 0, 1, 2, 3, 0, 0, 0, 2, 0, 40, 0, 0, 0, 0, 0, 0, 3, 232, 0, 0, 3, 232,
0, 0, 0, 1, 0, 0, 0, 0, 58, 85, 116, 216, 255, 8, 153, 192, 0, 2, 27, 0, 0, 25, 0, 0,
0, 1, 0, 0, 0, 125, 255, 2, 68, 226, 0, 6, 11, 0, 1, 5, 0, 0, 0, 0, 29, 129, 25, 192,
];
let block_hash = genesis_block(Network::Bitcoin).block_hash();
let network_graph = NetworkGraph::new(block_hash);
assert_eq!(network_graph.read_only().channels().len(), 0);
let initialization_result = update_network_graph(&network_graph, &initialization_input[..]);
if initialization_result.is_err() {
panic!(
"Unexpected initialization result: {:?}",
initialization_result
)
}
assert_eq!(network_graph.read_only().channels().len(), 2);
let initialized = network_graph.to_string();
assert!(initialized
.contains("021607cfce19a4c5e7e6e738663dfafbbbac262e4ff76c2c9b30dbeefc35c00643"));
assert!(initialized
.contains("02247d9db0dfafea745ef8c9e161eb322f73ac3f8858d8730b6fd97254747ce76b"));
assert!(initialized
.contains("029e01f279986acc83ba235d46d80aede0b7595f410353b93a8ab540bb677f4432"));
assert!(initialized
.contains("02c913118a8895b9e29c89af6e20ed00d95a1f64e4952edbafa84d048f26804c61"));
assert!(initialized.contains("619737530008010752"));
assert!(initialized.contains("783241506229452801"));
let opposite_direction_incremental_update_input = vec![
76, 68, 75, 1, 111, 226, 140, 10, 182, 241, 179, 114, 193, 166, 162, 70, 174, 99, 247,
79, 147, 30, 131, 101, 225, 90, 8, 156, 104, 214, 25, 0, 0, 0, 0, 0, 97, 229, 183, 167,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 255, 8, 153, 192, 0, 2, 27, 0, 0, 136, 0, 0, 0, 221, 255, 2,
68, 226, 0, 6, 11, 0, 1, 128,
];
let update_result = update_network_graph(
&network_graph,
&opposite_direction_incremental_update_input[..],
);
assert!(update_result.is_err());
if let Err(GraphSyncError::LightningError(lightning_error)) = update_result {
assert_eq!(
lightning_error.err,
"Couldn't find previous directional data for update"
);
} else {
panic!("Unexpected update result: {:?}", update_result)
}
}
#[test]
fn incremental_update_succeeds_with_prior_announcements_and_full_updates() {
let initialization_input = vec![
76, 68, 75, 1, 111, 226, 140, 10, 182, 241, 179, 114, 193, 166, 162, 70, 174, 99, 247,
79, 147, 30, 131, 101, 225, 90, 8, 156, 104, 214, 25, 0, 0, 0, 0, 0, 97, 227, 98, 218,
0, 0, 0, 4, 2, 22, 7, 207, 206, 25, 164, 197, 231, 230, 231, 56, 102, 61, 250, 251,
187, 172, 38, 46, 79, 247, 108, 44, 155, 48, 219, 238, 252, 53, 192, 6, 67, 2, 36, 125,
157, 176, 223, 175, 234, 116, 94, 248, 201, 225, 97, 235, 50, 47, 115, 172, 63, 136,
88, 216, 115, 11, 111, 217, 114, 84, 116, 124, 231, 107, 2, 158, 1, 242, 121, 152, 106,
204, 131, 186, 35, 93, 70, 216, 10, 237, 224, 183, 89, 95, 65, 3, 83, 185, 58, 138,
181, 64, 187, 103, 127, 68, 50, 2, 201, 19, 17, 138, 136, 149, 185, 226, 156, 137, 175,
110, 32, 237, 0, 217, 90, 31, 100, 228, 149, 46, 219, 175, 168, 77, 4, 143, 38, 128,
76, 97, 0, 0, 0, 2, 0, 0, 255, 8, 153, 192, 0, 2, 27, 0, 0, 0, 1, 0, 0, 255, 2, 68,
226, 0, 6, 11, 0, 1, 2, 3, 0, 0, 0, 4, 0, 40, 0, 0, 0, 0, 0, 0, 3, 232, 0, 0, 3, 232,
0, 0, 0, 1, 0, 0, 0, 0, 58, 85, 116, 216, 255, 8, 153, 192, 0, 2, 27, 0, 0, 56, 0, 0,
0, 0, 0, 0, 0, 1, 0, 0, 0, 100, 0, 0, 2, 224, 0, 25, 0, 0, 0, 1, 0, 0, 0, 125, 255, 2,
68, 226, 0, 6, 11, 0, 1, 4, 0, 0, 0, 0, 29, 129, 25, 192, 0, 5, 0, 0, 0, 0, 29, 129,
25, 192,
];
let block_hash = genesis_block(Network::Bitcoin).block_hash();
let network_graph = NetworkGraph::new(block_hash);
assert_eq!(network_graph.read_only().channels().len(), 0);
let initialization_result = update_network_graph(&network_graph, &initialization_input[..]);
assert!(initialization_result.is_ok());
let single_direction_incremental_update_input = vec![
76, 68, 75, 1, 111, 226, 140, 10, 182, 241, 179, 114, 193, 166, 162, 70, 174, 99, 247,
79, 147, 30, 131, 101, 225, 90, 8, 156, 104, 214, 25, 0, 0, 0, 0, 0, 97, 229, 183, 167,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 255, 8, 153, 192, 0, 2, 27, 0, 0, 136, 0, 0, 0, 221, 255, 2,
68, 226, 0, 6, 11, 0, 1, 128,
];
let update_result = update_network_graph(
&network_graph,
&single_direction_incremental_update_input[..],
);
if update_result.is_err() {
panic!("Unexpected update result: {:?}", update_result)
}
assert_eq!(network_graph.read_only().channels().len(), 2);
let after = network_graph.to_string();
assert!(
after.contains("021607cfce19a4c5e7e6e738663dfafbbbac262e4ff76c2c9b30dbeefc35c00643")
);
assert!(
after.contains("02247d9db0dfafea745ef8c9e161eb322f73ac3f8858d8730b6fd97254747ce76b")
);
assert!(
after.contains("029e01f279986acc83ba235d46d80aede0b7595f410353b93a8ab540bb677f4432")
);
assert!(
after.contains("02c913118a8895b9e29c89af6e20ed00d95a1f64e4952edbafa84d048f26804c61")
);
assert!(after.contains("619737530008010752"));
assert!(after.contains("783241506229452801"));
}
#[test]
fn full_update_succeeds() {
let valid_input = vec![
76, 68, 75, 1, 111, 226, 140, 10, 182, 241, 179, 114, 193, 166, 162, 70, 174, 99, 247,
79, 147, 30, 131, 101, 225, 90, 8, 156, 104, 214, 25, 0, 0, 0, 0, 0, 97, 227, 98, 218,
0, 0, 0, 4, 2, 22, 7, 207, 206, 25, 164, 197, 231, 230, 231, 56, 102, 61, 250, 251,
187, 172, 38, 46, 79, 247, 108, 44, 155, 48, 219, 238, 252, 53, 192, 6, 67, 2, 36, 125,
157, 176, 223, 175, 234, 116, 94, 248, 201, 225, 97, 235, 50, 47, 115, 172, 63, 136,
88, 216, 115, 11, 111, 217, 114, 84, 116, 124, 231, 107, 2, 158, 1, 242, 121, 152, 106,
204, 131, 186, 35, 93, 70, 216, 10, 237, 224, 183, 89, 95, 65, 3, 83, 185, 58, 138,
181, 64, 187, 103, 127, 68, 50, 2, 201, 19, 17, 138, 136, 149, 185, 226, 156, 137, 175,
110, 32, 237, 0, 217, 90, 31, 100, 228, 149, 46, 219, 175, 168, 77, 4, 143, 38, 128,
76, 97, 0, 0, 0, 2, 0, 0, 255, 8, 153, 192, 0, 2, 27, 0, 0, 0, 1, 0, 0, 255, 2, 68,
226, 0, 6, 11, 0, 1, 2, 3, 0, 0, 0, 4, 0, 40, 0, 0, 0, 0, 0, 0, 3, 232, 0, 0, 3, 232,
0, 0, 0, 1, 0, 0, 0, 0, 29, 129, 25, 192, 255, 8, 153, 192, 0, 2, 27, 0, 0, 60, 0, 0,
0, 0, 0, 0, 0, 1, 0, 0, 0, 100, 0, 0, 2, 224, 0, 0, 0, 0, 58, 85, 116, 216, 0, 29, 0,
0, 0, 1, 0, 0, 0, 125, 0, 0, 0, 0, 58, 85, 116, 216, 255, 2, 68, 226, 0, 6, 11, 0, 1,
0, 0, 1,
];
let block_hash = genesis_block(Network::Bitcoin).block_hash();
let network_graph = NetworkGraph::new(block_hash);
assert_eq!(network_graph.read_only().channels().len(), 0);
let update_result = update_network_graph(&network_graph, &valid_input[..]);
if update_result.is_err() {
panic!("Unexpected update result: {:?}", update_result)
}
assert_eq!(network_graph.read_only().channels().len(), 2);
let after = network_graph.to_string();
assert!(
after.contains("021607cfce19a4c5e7e6e738663dfafbbbac262e4ff76c2c9b30dbeefc35c00643")
);
assert!(
after.contains("02247d9db0dfafea745ef8c9e161eb322f73ac3f8858d8730b6fd97254747ce76b")
);
assert!(
after.contains("029e01f279986acc83ba235d46d80aede0b7595f410353b93a8ab540bb677f4432")
);
assert!(
after.contains("02c913118a8895b9e29c89af6e20ed00d95a1f64e4952edbafa84d048f26804c61")
);
assert!(after.contains("619737530008010752"));
assert!(after.contains("783241506229452801"));
}
}

View File

@ -158,7 +158,7 @@ mod sync {
#[cfg(test)] #[cfg(test)]
pub use debug_sync::*; pub use debug_sync::*;
#[cfg(not(test))] #[cfg(not(test))]
pub use ::std::sync::{Arc, Mutex, Condvar, MutexGuard, RwLock, RwLockReadGuard}; pub use ::std::sync::{Arc, Mutex, Condvar, MutexGuard, RwLock, RwLockReadGuard, RwLockWriteGuard};
#[cfg(not(test))] #[cfg(not(test))]
pub use crate::util::fairrwlock::FairRwLock; pub use crate::util::fairrwlock::FairRwLock;
} }

View File

@ -630,7 +630,10 @@ pub struct UnsignedChannelUpdate {
pub fee_base_msat: u32, pub fee_base_msat: u32,
/// The amount to fee multiplier, in micro-satoshi /// The amount to fee multiplier, in micro-satoshi
pub fee_proportional_millionths: u32, pub fee_proportional_millionths: u32,
pub(crate) excess_data: Vec<u8>, /// Excess data which was signed as a part of the message which we do not (yet) understand how
/// to decode. This is stored to ensure forward-compatibility as new fields are added to the
/// lightning gossip
pub excess_data: Vec<u8>,
} }
/// A channel_update message to be sent or received from a peer /// A channel_update message to be sent or received from a peer
#[derive(Clone, Debug, PartialEq)] #[derive(Clone, Debug, PartialEq)]

View File

@ -712,6 +712,16 @@ impl ChannelInfo {
}; };
Some((DirectedChannelInfo::new(self, direction), target)) Some((DirectedChannelInfo::new(self, direction), target))
} }
/// Returns a [`ChannelUpdateInfo`] based on the direction implied by the channel_flag.
pub fn get_directional_info(&self, channel_flags: u8) -> Option<&ChannelUpdateInfo> {
let direction = channel_flags & 1u8;
if direction == 0 {
self.one_to_two.as_ref()
} else {
self.two_to_one.as_ref()
}
}
} }
impl fmt::Display for ChannelInfo { impl fmt::Display for ChannelInfo {
@ -1155,6 +1165,83 @@ impl NetworkGraph {
self.update_channel_from_unsigned_announcement_intern(msg, None, chain_access) self.update_channel_from_unsigned_announcement_intern(msg, None, chain_access)
} }
/// Update channel from partial announcement data received via rapid gossip sync
///
/// `timestamp: u64`: Timestamp emulating the backdated original announcement receipt (by the
/// rapid gossip sync server)
///
/// All other parameters as used in [`msgs::UnsignedChannelAnnouncement`] fields.
pub fn add_channel_from_partial_announcement(&self, short_channel_id: u64, timestamp: u64, features: ChannelFeatures, node_id_1: PublicKey, node_id_2: PublicKey) -> Result<(), LightningError> {
if node_id_1 == node_id_2 {
return Err(LightningError{err: "Channel announcement node had a channel with itself".to_owned(), action: ErrorAction::IgnoreError});
};
let node_1 = NodeId::from_pubkey(&node_id_1);
let node_2 = NodeId::from_pubkey(&node_id_2);
let channel_info = ChannelInfo {
features,
node_one: node_1.clone(),
one_to_two: None,
node_two: node_2.clone(),
two_to_one: None,
capacity_sats: None,
announcement_message: None,
announcement_received_time: timestamp,
};
self.add_channel_between_nodes(short_channel_id, channel_info, None)
}
fn add_channel_between_nodes(&self, short_channel_id: u64, channel_info: ChannelInfo, utxo_value: Option<u64>) -> Result<(), LightningError> {
let mut channels = self.channels.write().unwrap();
let mut nodes = self.nodes.write().unwrap();
let node_id_a = channel_info.node_one.clone();
let node_id_b = channel_info.node_two.clone();
match channels.entry(short_channel_id) {
BtreeEntry::Occupied(mut entry) => {
//TODO: because asking the blockchain if short_channel_id is valid is only optional
//in the blockchain API, we need to handle it smartly here, though it's unclear
//exactly how...
if utxo_value.is_some() {
// Either our UTXO provider is busted, there was a reorg, or the UTXO provider
// only sometimes returns results. In any case remove the previous entry. Note
// that the spec expects us to "blacklist" the node_ids involved, but we can't
// do that because
// a) we don't *require* a UTXO provider that always returns results.
// b) we don't track UTXOs of channels we know about and remove them if they
// get reorg'd out.
// c) it's unclear how to do so without exposing ourselves to massive DoS risk.
Self::remove_channel_in_nodes(&mut nodes, &entry.get(), short_channel_id);
*entry.get_mut() = channel_info;
} else {
return Err(LightningError{err: "Already have knowledge of channel".to_owned(), action: ErrorAction::IgnoreDuplicateGossip});
}
},
BtreeEntry::Vacant(entry) => {
entry.insert(channel_info);
}
};
for current_node_id in [node_id_a, node_id_b].iter() {
match nodes.entry(current_node_id.clone()) {
BtreeEntry::Occupied(node_entry) => {
node_entry.into_mut().channels.push(short_channel_id);
},
BtreeEntry::Vacant(node_entry) => {
node_entry.insert(NodeInfo {
channels: vec!(short_channel_id),
lowest_inbound_channel_fees: None,
announcement_info: None,
});
}
};
};
Ok(())
}
fn update_channel_from_unsigned_announcement_intern<C: Deref>( fn update_channel_from_unsigned_announcement_intern<C: Deref>(
&self, msg: &msgs::UnsignedChannelAnnouncement, full_msg: Option<&msgs::ChannelAnnouncement>, chain_access: &Option<C> &self, msg: &msgs::UnsignedChannelAnnouncement, full_msg: Option<&msgs::ChannelAnnouncement>, chain_access: &Option<C>
) -> Result<(), LightningError> ) -> Result<(), LightningError>
@ -1214,54 +1301,7 @@ impl NetworkGraph {
announcement_received_time, announcement_received_time,
}; };
let mut channels = self.channels.write().unwrap(); self.add_channel_between_nodes(msg.short_channel_id, chan_info, utxo_value)
let mut nodes = self.nodes.write().unwrap();
match channels.entry(msg.short_channel_id) {
BtreeEntry::Occupied(mut entry) => {
//TODO: because asking the blockchain if short_channel_id is valid is only optional
//in the blockchain API, we need to handle it smartly here, though it's unclear
//exactly how...
if utxo_value.is_some() {
// Either our UTXO provider is busted, there was a reorg, or the UTXO provider
// only sometimes returns results. In any case remove the previous entry. Note
// that the spec expects us to "blacklist" the node_ids involved, but we can't
// do that because
// a) we don't *require* a UTXO provider that always returns results.
// b) we don't track UTXOs of channels we know about and remove them if they
// get reorg'd out.
// c) it's unclear how to do so without exposing ourselves to massive DoS risk.
Self::remove_channel_in_nodes(&mut nodes, &entry.get(), msg.short_channel_id);
*entry.get_mut() = chan_info;
} else {
return Err(LightningError{err: "Already have knowledge of channel".to_owned(), action: ErrorAction::IgnoreDuplicateGossip});
}
},
BtreeEntry::Vacant(entry) => {
entry.insert(chan_info);
}
};
macro_rules! add_channel_to_node {
( $node_id: expr ) => {
match nodes.entry($node_id) {
BtreeEntry::Occupied(node_entry) => {
node_entry.into_mut().channels.push(msg.short_channel_id);
},
BtreeEntry::Vacant(node_entry) => {
node_entry.insert(NodeInfo {
channels: vec!(msg.short_channel_id),
lowest_inbound_channel_fees: None,
announcement_info: None,
});
}
}
};
}
add_channel_to_node!(NodeId::from_pubkey(&msg.node_id_1));
add_channel_to_node!(NodeId::from_pubkey(&msg.node_id_2));
Ok(())
} }
/// Close a channel if a corresponding HTLC fail was sent. /// Close a channel if a corresponding HTLC fail was sent.

View File

@ -301,7 +301,7 @@ impl Readable for U48 {
/// encoded in several different ways, which we must check for at deserialization-time. Thus, if /// encoded in several different ways, which we must check for at deserialization-time. Thus, if
/// you're looking for an example of a variable-length integer to use for your own project, move /// you're looking for an example of a variable-length integer to use for your own project, move
/// along, this is a rather poor design. /// along, this is a rather poor design.
pub(crate) struct BigSize(pub u64); pub struct BigSize(pub u64);
impl Writeable for BigSize { impl Writeable for BigSize {
#[inline] #[inline]
fn write<W: Writer>(&self, writer: &mut W) -> Result<(), io::Error> { fn write<W: Writer>(&self, writer: &mut W) -> Result<(), io::Error> {