From 8cfda1a5464e5c16ed2694f090b2102e62c3453c Mon Sep 17 00:00:00 2001 From: junderw Date: Sat, 24 Jun 2023 19:28:19 -0700 Subject: [PATCH] Use tokio async/await instead of callbacks --- Cargo.lock | 110 ++++++++++++++++++++++++++++++ backend/rust-gbt/Cargo.toml | 2 +- backend/rust-gbt/index.d.ts | 4 +- backend/rust-gbt/package.json | 4 +- backend/rust-gbt/src/lib.rs | 81 ++++++++-------------- backend/src/api/mempool-blocks.ts | 14 +--- 6 files changed, 148 insertions(+), 67 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 350e25fb7..7837483d7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -72,6 +72,15 @@ version = "0.12.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" +[[package]] +name = "hermit-abi" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee512640fe35acbfb4bb779db6f0d80704c2cacfa2e39b601ef3e3f47d1ae4c7" +dependencies = [ + "libc", +] + [[package]] name = "indexmap" version = "1.9.3" @@ -82,6 +91,12 @@ dependencies = [ "hashbrown", ] +[[package]] +name = "libc" +version = "0.2.146" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f92be4933c13fd498862a9e02a3055f8a8d9c039ce33db97306fd5a6caa7f29b" + [[package]] name = "libloading" version = "0.7.4" @@ -109,6 +124,7 @@ dependencies = [ "napi-derive", "napi-sys", "once_cell", + "tokio", ] [[package]] @@ -155,12 +171,28 @@ dependencies = [ "libloading", ] +[[package]] +name = "num_cpus" +version = "1.15.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0fac9e2da13b5eb447a6ce3d392f23a29d8694bff781bf03a16cd9ac8697593b" +dependencies = [ + "hermit-abi", + "libc", +] + [[package]] name = "once_cell" version = "1.18.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dd8b5dd2ae5ed71462c540258bedcb51965123ad7e7ccf4b9a8cafaa4a63576d" +[[package]] +name = "pin-project-lite" +version = "0.2.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e0a7ae3ac2f1173085d398531c705756c94a4c56843785df85a60c1a0afac116" + [[package]] name = "priority-queue" version = "1.3.2" @@ -234,6 +266,18 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "tokio" +version = "1.28.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "94d7b1cfd2aa4011f2de74c2c4c63665e27a71006b0a192dcd2710272e73dfa2" +dependencies = [ + "autocfg", + "num_cpus", + "pin-project-lite", + "windows-sys", +] + [[package]] name = "unicode-ident" version = "1.0.9" @@ -267,3 +311,69 @@ name = "winapi-x86_64-pc-windows-gnu" version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" + +[[package]] +name = "windows-sys" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "677d2418bec65e3338edb076e806bc1ec15693c5d0104683f2efe857f61056a9" +dependencies = [ + "windows-targets", +] + +[[package]] +name = "windows-targets" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b1eb6f0cd7c80c79759c929114ef071b87354ce476d9d94271031c0497adfd5" +dependencies = [ + "windows_aarch64_gnullvm", + "windows_aarch64_msvc", + "windows_i686_gnu", + "windows_i686_msvc", + "windows_x86_64_gnu", + "windows_x86_64_gnullvm", + "windows_x86_64_msvc", +] + +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "91ae572e1b79dba883e0d315474df7305d12f569b400fcf90581b06062f7e1bc" + +[[package]] +name = "windows_aarch64_msvc" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b2ef27e0d7bdfcfc7b868b317c1d32c641a6fe4629c171b8928c7b08d98d7cf3" + +[[package]] +name = "windows_i686_gnu" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "622a1962a7db830d6fd0a69683c80a18fda201879f0f447f065a3b7467daa241" + +[[package]] +name = "windows_i686_msvc" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4542c6e364ce21bf45d69fdd2a8e455fa38d316158cfd43b3ac1c5b1b19f8e00" + +[[package]] +name = "windows_x86_64_gnu" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ca2b8a661f7628cbd23440e50b05d705db3686f894fc9580820623656af974b1" + +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7896dbc1f41e08872e9d5e8f8baa8fdd2677f29468c4e156210174edc7f7b953" + +[[package]] +name = "windows_x86_64_msvc" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a515f5799fe4961cb532f983ce2b23082366b898e52ffbce459c86f67c8378a" diff --git a/backend/rust-gbt/Cargo.toml b/backend/rust-gbt/Cargo.toml index bf5073240..a0f593518 100644 --- a/backend/rust-gbt/Cargo.toml +++ b/backend/rust-gbt/Cargo.toml @@ -15,7 +15,7 @@ crate-type = ["cdylib"] priority-queue = "1.3.2" bytes = "1.4.0" once_cell = "1.18.0" -napi = { version = "2.13.2", features = ["napi8"] } +napi = { version = "2.13.2", features = ["napi8", "tokio_rt"] } napi-derive = "2.13.0" [build-dependencies] diff --git a/backend/rust-gbt/index.d.ts b/backend/rust-gbt/index.d.ts index 3c8875a69..793d78c4e 100644 --- a/backend/rust-gbt/index.d.ts +++ b/backend/rust-gbt/index.d.ts @@ -3,8 +3,8 @@ /* auto-generated by NAPI-RS */ -export function make(mempoolBuffer: Uint8Array, callback: (result: GbtResult) => void): void -export function update(newTxs: Uint8Array, removeTxs: Uint8Array, callback: (result: GbtResult) => void): void +export function make(mempoolBuffer: Uint8Array): Promise +export function update(newTxs: Uint8Array, removeTxs: Uint8Array): Promise /** * The result from calling the gbt function. * diff --git a/backend/rust-gbt/package.json b/backend/rust-gbt/package.json index 3c6c7a8f5..57f91e781 100644 --- a/backend/rust-gbt/package.json +++ b/backend/rust-gbt/package.json @@ -7,8 +7,8 @@ "scripts": { "artifacts": "napi artifacts", "build": "napi build --platform", - "build-debug": "npm run build --", - "build-release": "npm run build -- --release", + "build-debug": "npm run build", + "build-release": "npm run build -- --release --strip", "install": "npm run build-release", "prepublishOnly": "napi prepublish -t npm", "test": "cargo test" diff --git a/backend/rust-gbt/src/lib.rs b/backend/rust-gbt/src/lib.rs index 23f8f37e4..0cdeb74e3 100644 --- a/backend/rust-gbt/src/lib.rs +++ b/backend/rust-gbt/src/lib.rs @@ -1,7 +1,4 @@ -use napi::{ - bindgen_prelude::*, - threadsafe_function::{ErrorStrategy, ThreadsafeFunction, ThreadsafeFunctionCallMode}, -}; +use napi::bindgen_prelude::*; use napi_derive::napi; use once_cell::sync::Lazy; @@ -14,43 +11,41 @@ mod thread_transaction; mod utils; use thread_transaction::ThreadTransaction; -/// Used for ThreadsafeFunction's queue size parameter -const UNBOUNDED_QUEUE: usize = 0; - static THREAD_TRANSACTIONS: Lazy>> = Lazy::new(|| Mutex::new(HashMap::new())); -#[napi(ts_args_type = "mempoolBuffer: Uint8Array, callback: (result: GbtResult) => void")] -pub fn make(mempool_buffer: Uint8Array, callback: JsFunction) -> Result<()> { +#[napi(ts_args_type = "mempoolBuffer: Uint8Array")] +pub async fn make(mempool_buffer: Uint8Array) -> Result { let mut map = HashMap::new(); for tx in ThreadTransaction::batch_from_buffer(&mempool_buffer) { map.insert(tx.uid, tx); } - let mut global_map = THREAD_TRANSACTIONS - .lock() - .map_err(|_| napi::Error::from_reason("THREAD_TRANSACTIONS Mutex poisoned"))?; - *global_map = map; + { + let mut global_map = THREAD_TRANSACTIONS + .lock() + .map_err(|_| napi::Error::from_reason("THREAD_TRANSACTIONS Mutex poisoned"))?; + *global_map = map; + } - run_in_thread(callback) + run_in_thread().await } -#[napi( - ts_args_type = "newTxs: Uint8Array, removeTxs: Uint8Array, callback: (result: GbtResult) => void" -)] -pub fn update(new_txs: Uint8Array, remove_txs: Uint8Array, callback: JsFunction) -> Result<()> { - let mut map = THREAD_TRANSACTIONS - .lock() - .map_err(|_| napi::Error::from_reason("THREAD_TRANSACTIONS Mutex poisoned"))?; - for tx in ThreadTransaction::batch_from_buffer(&new_txs) { - map.insert(tx.uid, tx); +#[napi(ts_args_type = "newTxs: Uint8Array, removeTxs: Uint8Array")] +pub async fn update(new_txs: Uint8Array, remove_txs: Uint8Array) -> Result { + { + let mut map = THREAD_TRANSACTIONS + .lock() + .map_err(|_| napi::Error::from_reason("THREAD_TRANSACTIONS Mutex poisoned"))?; + for tx in ThreadTransaction::batch_from_buffer(&new_txs) { + map.insert(tx.uid, tx); + } + for txid in &utils::txids_from_buffer(&remove_txs) { + map.remove(txid); + } } - for txid in &utils::txids_from_buffer(&remove_txs) { - map.remove(txid); - } - drop(map); - run_in_thread(callback) + run_in_thread().await } /// The result from calling the gbt function. @@ -66,31 +61,15 @@ pub struct GbtResult { pub rates: Vec>, // Tuples not supported. u32 fits inside f64 } -fn run_in_thread(callback: JsFunction) -> Result<()> { - let thread_safe_callback: ThreadsafeFunction = - callback.create_threadsafe_function(UNBOUNDED_QUEUE, |ctx| Ok(vec![ctx.value]))?; - - let handle = std::thread::spawn(move || { - let result = { - let mut map = THREAD_TRANSACTIONS - .lock() - .map_err(|_| napi::Error::from_reason("THREAD_TRANSACTIONS Mutex poisoned"))?; - gbt::gbt(&mut map).ok_or_else(|| napi::Error::from_reason("gbt failed"))? - }; - - // Note: A call mode of Blocking does not mean it will block, but rather it tells - // the N-API what to do in the event of a full queue. - // The queue will never be full, so Blocking is fine. - match thread_safe_callback.call(result, ThreadsafeFunctionCallMode::Blocking) { - Status::Ok => Ok(()), - error => Err(napi::Error::from_reason(format!( - "Callback failure: {}", - error - ))), - } +async fn run_in_thread() -> Result { + let handle = napi::tokio::task::spawn_blocking(move || { + let mut map = THREAD_TRANSACTIONS + .lock() + .map_err(|_| napi::Error::from_reason("THREAD_TRANSACTIONS Mutex poisoned"))?; + gbt::gbt(&mut map).ok_or_else(|| napi::Error::from_reason("gbt failed")) }); handle - .join() + .await .map_err(|_| napi::Error::from_reason("thread panicked"))? } diff --git a/backend/src/api/mempool-blocks.ts b/backend/src/api/mempool-blocks.ts index ed8fbb172..a4786af7c 100644 --- a/backend/src/api/mempool-blocks.ts +++ b/backend/src/api/mempool-blocks.ts @@ -342,12 +342,7 @@ class MempoolBlocks { // run the block construction algorithm in a separate thread, and wait for a result try { const { blocks, rates, clusters } = this.convertNapiResultTxids( - await new Promise((resolve) => { - napiAddon.make( - new Uint8Array(mempoolBuffer), - resolve, - ); - }) + await napiAddon.make(new Uint8Array(mempoolBuffer)), ); this.rustInitialized = true; const processed = this.processBlockTemplates(newMempool, blocks, rates, clusters, saveResults); @@ -381,13 +376,10 @@ class MempoolBlocks { // run the block construction algorithm in a separate thread, and wait for a result try { const { blocks, rates, clusters } = this.convertNapiResultTxids( - await new Promise((resolve) => { - napiAddon.update( + await napiAddon.update( new Uint8Array(addedBuffer), new Uint8Array(removedBuffer), - resolve, - ); - }) + ), ); this.processBlockTemplates(newMempool, blocks, rates, clusters, saveResults); logger.debug(`RUST updateBlockTemplates completed in ${(Date.now() - start)/1000} seconds`);