Use tokio async/await instead of callbacks

This commit is contained in:
junderw 2023-06-24 19:28:19 -07:00 committed by Mononaut
parent 5f161e73c7
commit 8cfda1a546
No known key found for this signature in database
GPG key ID: A3F058E41374C04E
6 changed files with 148 additions and 67 deletions

110
Cargo.lock generated
View file

@ -72,6 +72,15 @@ version = "0.12.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888"
[[package]]
name = "hermit-abi"
version = "0.2.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ee512640fe35acbfb4bb779db6f0d80704c2cacfa2e39b601ef3e3f47d1ae4c7"
dependencies = [
"libc",
]
[[package]]
name = "indexmap"
version = "1.9.3"
@ -82,6 +91,12 @@ dependencies = [
"hashbrown",
]
[[package]]
name = "libc"
version = "0.2.146"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f92be4933c13fd498862a9e02a3055f8a8d9c039ce33db97306fd5a6caa7f29b"
[[package]]
name = "libloading"
version = "0.7.4"
@ -109,6 +124,7 @@ dependencies = [
"napi-derive",
"napi-sys",
"once_cell",
"tokio",
]
[[package]]
@ -155,12 +171,28 @@ dependencies = [
"libloading",
]
[[package]]
name = "num_cpus"
version = "1.15.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0fac9e2da13b5eb447a6ce3d392f23a29d8694bff781bf03a16cd9ac8697593b"
dependencies = [
"hermit-abi",
"libc",
]
[[package]]
name = "once_cell"
version = "1.18.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dd8b5dd2ae5ed71462c540258bedcb51965123ad7e7ccf4b9a8cafaa4a63576d"
[[package]]
name = "pin-project-lite"
version = "0.2.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e0a7ae3ac2f1173085d398531c705756c94a4c56843785df85a60c1a0afac116"
[[package]]
name = "priority-queue"
version = "1.3.2"
@ -234,6 +266,18 @@ dependencies = [
"unicode-ident",
]
[[package]]
name = "tokio"
version = "1.28.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "94d7b1cfd2aa4011f2de74c2c4c63665e27a71006b0a192dcd2710272e73dfa2"
dependencies = [
"autocfg",
"num_cpus",
"pin-project-lite",
"windows-sys",
]
[[package]]
name = "unicode-ident"
version = "1.0.9"
@ -267,3 +311,69 @@ name = "winapi-x86_64-pc-windows-gnu"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f"
[[package]]
name = "windows-sys"
version = "0.48.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "677d2418bec65e3338edb076e806bc1ec15693c5d0104683f2efe857f61056a9"
dependencies = [
"windows-targets",
]
[[package]]
name = "windows-targets"
version = "0.48.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7b1eb6f0cd7c80c79759c929114ef071b87354ce476d9d94271031c0497adfd5"
dependencies = [
"windows_aarch64_gnullvm",
"windows_aarch64_msvc",
"windows_i686_gnu",
"windows_i686_msvc",
"windows_x86_64_gnu",
"windows_x86_64_gnullvm",
"windows_x86_64_msvc",
]
[[package]]
name = "windows_aarch64_gnullvm"
version = "0.48.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "91ae572e1b79dba883e0d315474df7305d12f569b400fcf90581b06062f7e1bc"
[[package]]
name = "windows_aarch64_msvc"
version = "0.48.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b2ef27e0d7bdfcfc7b868b317c1d32c641a6fe4629c171b8928c7b08d98d7cf3"
[[package]]
name = "windows_i686_gnu"
version = "0.48.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "622a1962a7db830d6fd0a69683c80a18fda201879f0f447f065a3b7467daa241"
[[package]]
name = "windows_i686_msvc"
version = "0.48.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4542c6e364ce21bf45d69fdd2a8e455fa38d316158cfd43b3ac1c5b1b19f8e00"
[[package]]
name = "windows_x86_64_gnu"
version = "0.48.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ca2b8a661f7628cbd23440e50b05d705db3686f894fc9580820623656af974b1"
[[package]]
name = "windows_x86_64_gnullvm"
version = "0.48.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7896dbc1f41e08872e9d5e8f8baa8fdd2677f29468c4e156210174edc7f7b953"
[[package]]
name = "windows_x86_64_msvc"
version = "0.48.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1a515f5799fe4961cb532f983ce2b23082366b898e52ffbce459c86f67c8378a"

View file

@ -15,7 +15,7 @@ crate-type = ["cdylib"]
priority-queue = "1.3.2"
bytes = "1.4.0"
once_cell = "1.18.0"
napi = { version = "2.13.2", features = ["napi8"] }
napi = { version = "2.13.2", features = ["napi8", "tokio_rt"] }
napi-derive = "2.13.0"
[build-dependencies]

View file

@ -3,8 +3,8 @@
/* auto-generated by NAPI-RS */
export function make(mempoolBuffer: Uint8Array, callback: (result: GbtResult) => void): void
export function update(newTxs: Uint8Array, removeTxs: Uint8Array, callback: (result: GbtResult) => void): void
export function make(mempoolBuffer: Uint8Array): Promise<GbtResult>
export function update(newTxs: Uint8Array, removeTxs: Uint8Array): Promise<GbtResult>
/**
* The result from calling the gbt function.
*

View file

@ -7,8 +7,8 @@
"scripts": {
"artifacts": "napi artifacts",
"build": "napi build --platform",
"build-debug": "npm run build --",
"build-release": "npm run build -- --release",
"build-debug": "npm run build",
"build-release": "npm run build -- --release --strip",
"install": "npm run build-release",
"prepublishOnly": "napi prepublish -t npm",
"test": "cargo test"

View file

@ -1,7 +1,4 @@
use napi::{
bindgen_prelude::*,
threadsafe_function::{ErrorStrategy, ThreadsafeFunction, ThreadsafeFunctionCallMode},
};
use napi::bindgen_prelude::*;
use napi_derive::napi;
use once_cell::sync::Lazy;
@ -14,43 +11,41 @@ mod thread_transaction;
mod utils;
use thread_transaction::ThreadTransaction;
/// Used for ThreadsafeFunction's queue size parameter
const UNBOUNDED_QUEUE: usize = 0;
static THREAD_TRANSACTIONS: Lazy<Mutex<HashMap<u32, ThreadTransaction>>> =
Lazy::new(|| Mutex::new(HashMap::new()));
#[napi(ts_args_type = "mempoolBuffer: Uint8Array, callback: (result: GbtResult) => void")]
pub fn make(mempool_buffer: Uint8Array, callback: JsFunction) -> Result<()> {
#[napi(ts_args_type = "mempoolBuffer: Uint8Array")]
pub async fn make(mempool_buffer: Uint8Array) -> Result<GbtResult> {
let mut map = HashMap::new();
for tx in ThreadTransaction::batch_from_buffer(&mempool_buffer) {
map.insert(tx.uid, tx);
}
let mut global_map = THREAD_TRANSACTIONS
.lock()
.map_err(|_| napi::Error::from_reason("THREAD_TRANSACTIONS Mutex poisoned"))?;
*global_map = map;
{
let mut global_map = THREAD_TRANSACTIONS
.lock()
.map_err(|_| napi::Error::from_reason("THREAD_TRANSACTIONS Mutex poisoned"))?;
*global_map = map;
}
run_in_thread(callback)
run_in_thread().await
}
#[napi(
ts_args_type = "newTxs: Uint8Array, removeTxs: Uint8Array, callback: (result: GbtResult) => void"
)]
pub fn update(new_txs: Uint8Array, remove_txs: Uint8Array, callback: JsFunction) -> Result<()> {
let mut map = THREAD_TRANSACTIONS
.lock()
.map_err(|_| napi::Error::from_reason("THREAD_TRANSACTIONS Mutex poisoned"))?;
for tx in ThreadTransaction::batch_from_buffer(&new_txs) {
map.insert(tx.uid, tx);
#[napi(ts_args_type = "newTxs: Uint8Array, removeTxs: Uint8Array")]
pub async fn update(new_txs: Uint8Array, remove_txs: Uint8Array) -> Result<GbtResult> {
{
let mut map = THREAD_TRANSACTIONS
.lock()
.map_err(|_| napi::Error::from_reason("THREAD_TRANSACTIONS Mutex poisoned"))?;
for tx in ThreadTransaction::batch_from_buffer(&new_txs) {
map.insert(tx.uid, tx);
}
for txid in &utils::txids_from_buffer(&remove_txs) {
map.remove(txid);
}
}
for txid in &utils::txids_from_buffer(&remove_txs) {
map.remove(txid);
}
drop(map);
run_in_thread(callback)
run_in_thread().await
}
/// The result from calling the gbt function.
@ -66,31 +61,15 @@ pub struct GbtResult {
pub rates: Vec<Vec<f64>>, // Tuples not supported. u32 fits inside f64
}
fn run_in_thread(callback: JsFunction) -> Result<()> {
let thread_safe_callback: ThreadsafeFunction<GbtResult, ErrorStrategy::Fatal> =
callback.create_threadsafe_function(UNBOUNDED_QUEUE, |ctx| Ok(vec![ctx.value]))?;
let handle = std::thread::spawn(move || {
let result = {
let mut map = THREAD_TRANSACTIONS
.lock()
.map_err(|_| napi::Error::from_reason("THREAD_TRANSACTIONS Mutex poisoned"))?;
gbt::gbt(&mut map).ok_or_else(|| napi::Error::from_reason("gbt failed"))?
};
// Note: A call mode of Blocking does not mean it will block, but rather it tells
// the N-API what to do in the event of a full queue.
// The queue will never be full, so Blocking is fine.
match thread_safe_callback.call(result, ThreadsafeFunctionCallMode::Blocking) {
Status::Ok => Ok(()),
error => Err(napi::Error::from_reason(format!(
"Callback failure: {}",
error
))),
}
async fn run_in_thread() -> Result<GbtResult> {
let handle = napi::tokio::task::spawn_blocking(move || {
let mut map = THREAD_TRANSACTIONS
.lock()
.map_err(|_| napi::Error::from_reason("THREAD_TRANSACTIONS Mutex poisoned"))?;
gbt::gbt(&mut map).ok_or_else(|| napi::Error::from_reason("gbt failed"))
});
handle
.join()
.await
.map_err(|_| napi::Error::from_reason("thread panicked"))?
}

View file

@ -342,12 +342,7 @@ class MempoolBlocks {
// run the block construction algorithm in a separate thread, and wait for a result
try {
const { blocks, rates, clusters } = this.convertNapiResultTxids(
await new Promise((resolve) => {
napiAddon.make(
new Uint8Array(mempoolBuffer),
resolve,
);
})
await napiAddon.make(new Uint8Array(mempoolBuffer)),
);
this.rustInitialized = true;
const processed = this.processBlockTemplates(newMempool, blocks, rates, clusters, saveResults);
@ -381,13 +376,10 @@ class MempoolBlocks {
// run the block construction algorithm in a separate thread, and wait for a result
try {
const { blocks, rates, clusters } = this.convertNapiResultTxids(
await new Promise((resolve) => {
napiAddon.update(
await napiAddon.update(
new Uint8Array(addedBuffer),
new Uint8Array(removedBuffer),
resolve,
);
})
),
);
this.processBlockTemplates(newMempool, blocks, rates, clusters, saveResults);
logger.debug(`RUST updateBlockTemplates completed in ${(Date.now() - start)/1000} seconds`);