diff --git a/examples/wasm_thread_example/.gitignore b/examples/wasm_thread_example/.gitignore new file mode 100644 index 0000000..ea8c4bf --- /dev/null +++ b/examples/wasm_thread_example/.gitignore @@ -0,0 +1 @@ +/target diff --git a/examples/wasm_thread_example/Cargo.toml b/examples/wasm_thread_example/Cargo.toml new file mode 100644 index 0000000..1223616 --- /dev/null +++ b/examples/wasm_thread_example/Cargo.toml @@ -0,0 +1,21 @@ +[package] +name = "wasm_thread_example" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html +[lib] +crate-type = ["cdylib", "rlib"] + +[dependencies] +wasm_thread = { git = "https://github.com/chemicstry/wasm_thread.git", features = [ + "es_modules", +], branch = "refactor" } +wasm-bindgen = { version = "0.2" } +wasm-bindgen-futures = "0.4.34" + +# Dev dependencies +log = { version = "0.4" } +env_logger = { version = "0.8" } +console_error_panic_hook = { version = "0.1.6" } +console_log = { version = "1.0.0", features = ["color"] } diff --git a/examples/wasm_thread_example/pkg/.gitignore b/examples/wasm_thread_example/pkg/.gitignore new file mode 100644 index 0000000..c4adb7f --- /dev/null +++ b/examples/wasm_thread_example/pkg/.gitignore @@ -0,0 +1,2 @@ +snippets +wasm_thread_example* diff --git a/examples/wasm_thread_example/pkg/index.html b/examples/wasm_thread_example/pkg/index.html new file mode 100644 index 0000000..3876a9b --- /dev/null +++ b/examples/wasm_thread_example/pkg/index.html @@ -0,0 +1,12 @@ + + + + + + Page Title + + + + + + diff --git a/examples/wasm_thread_example/pkg/index.js b/examples/wasm_thread_example/pkg/index.js new file mode 100644 index 0000000..1c12f90 --- /dev/null +++ b/examples/wasm_thread_example/pkg/index.js @@ -0,0 +1,12 @@ +const worker = new Worker("./worker.js", { type: "module" }); + +worker.postMessage({ type: "init" }); +worker.onmessage = (event) => { + if (event.data.type !== "initResponse") return; + + for (let i = 0; i < 100000; i++) { + worker.postMessage({ type: "encode" }); + } + + worker.postMessage({ type: "finish" }); +}; diff --git a/examples/wasm_thread_example/pkg/serve.json b/examples/wasm_thread_example/pkg/serve.json new file mode 100644 index 0000000..30dd586 --- /dev/null +++ b/examples/wasm_thread_example/pkg/serve.json @@ -0,0 +1,17 @@ +{ + "headers": [ + { + "source": "*", + "headers": [ + { + "key": "Cross-Origin-Opener-Policy", + "value": "same-origin" + }, + { + "key": "Cross-Origin-Embedder-Policy", + "value": "require-corp" + } + ] + } + ] +} diff --git a/examples/wasm_thread_example/pkg/worker.js b/examples/wasm_thread_example/pkg/worker.js new file mode 100644 index 0000000..eb9df2d --- /dev/null +++ b/examples/wasm_thread_example/pkg/worker.js @@ -0,0 +1,40 @@ +/// From https://github.com/rustwasm/wasm-bindgen/tree/main/examples/raytrace-parallel + +// synchronously, using the browser, import out shim JS scripts +import init, { Encoder } from "./wasm_thread_example.js"; + +// Wait for the main thread to send us the shared module/memory. Once we've got +// it, initialize it all with the `wasm_bindgen` global we imported via +// `importScripts`. +// +// After our first message all subsequent messages are an entry point to run, +// so we just do that. +let encoder; +let lastCount; +let diffCount = 0; +self.onmessage = async (event) => { + switch (event.data.type) { + case "init": { + await init(); + encoder = new Encoder(); + self.postMessage({ type: "initResponse" }); + break; + } + case "encode": { + encoder.increment(); + const count = encoder.count(); + if (count !== lastCount) { + lastCount = count; + console.log(count); + diffCount++; + } + break; + } + case "finish": { + const result = await encoder.stop(); + console.log("diffCount", diffCount); + self.postMessage({ type: "finishResponse", result }); + break; + } + } +}; diff --git a/examples/wasm_thread_example/src/lib.rs b/examples/wasm_thread_example/src/lib.rs new file mode 100644 index 0000000..fa0858b --- /dev/null +++ b/examples/wasm_thread_example/src/lib.rs @@ -0,0 +1,74 @@ +use std::sync::{atomic::AtomicUsize, mpsc::Sender}; + +use wasm_bindgen::prelude::*; +use wasm_thread as thread; + +#[wasm_bindgen] +pub struct Encoder { + counter_thread: Option>, + queue: Option>, +} + +static COUNT: AtomicUsize = AtomicUsize::new(0); + +#[wasm_bindgen] +impl Encoder { + #[wasm_bindgen(constructor)] + pub fn new() -> Self { + std::panic::set_hook(Box::new(console_error_panic_hook::hook)); + console_log::init_with_level(log::Level::Info).expect("Couldn't initialize logger"); + + let (queue, receiver) = std::sync::mpsc::channel(); + + wasm_thread::Builder::default() + .prefix(String::from("gif")) + .set_default(); + + log::info!("Creating counter thread"); + + let counter_thread = thread::Builder::new() + .name(String::from("counter")) + .spawn(move || { + log::info!("Counter thread started"); + while let Ok(_) = receiver.recv() { + let _old_count = COUNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + } + + log::info!("Counter thread stopped"); + COUNT.load(std::sync::atomic::Ordering::Relaxed) + }) + .unwrap(); + + log::info!("Created counter thread"); + + Self { + counter_thread: Some(counter_thread), + queue: Some(queue), + } + } + + pub fn count(&self) -> usize { + COUNT.load(std::sync::atomic::Ordering::Relaxed) + } + + pub fn increment(&mut self) { + self.queue + .as_ref() + .expect("Encoder was closed") + .send(()) + .unwrap(); + } + + pub async fn stop(&mut self) { + self.queue = None; + + match self.counter_thread.take().unwrap().join_async().await { + Ok(count) => { + log::info!("Count: {}", count); + } + Err(e) => { + log::error!("Error: {:?}", e); + } + } + } +} diff --git a/examples/wasm_thread_example/wasm-pack.sh b/examples/wasm_thread_example/wasm-pack.sh new file mode 100755 index 0000000..e174c17 --- /dev/null +++ b/examples/wasm_thread_example/wasm-pack.sh @@ -0,0 +1,26 @@ +#!/bin/sh + +case $1 in + *"-r"*) # Matches -r or --release + CONF="--release -Z build-std-features=panic_immediate_abort" + BINDGEN="" + FILE_PATH="release" + ;; + *) + CONF="" + BINDGEN="--keep-debug" + FILE_PATH="debug" + ;; +esac + +RUSTFLAGS='-C target-feature=+atomics,+bulk-memory,+mutable-globals' \ + cargo +nightly build \ + --target wasm32-unknown-unknown \ + -Z build-std=std,panic_abort \ + ${CONF} + +wasm-bindgen \ + target/wasm32-unknown-unknown/${FILE_PATH}/wasm_thread_example.wasm \ + ${BINDGEN} \ + --target web \ + --out-dir ./pkg diff --git a/tests/native.rs b/tests/native.rs index 5e8a0fe..745e424 100644 --- a/tests/native.rs +++ b/tests/native.rs @@ -1,5 +1,10 @@ //! Trivial tests to ensure that native thread API is unchanged. +use std::{ + sync::atomic::{AtomicUsize, Ordering}, + time::Duration, +}; + use wasm_thread as thread; #[test] @@ -38,3 +43,72 @@ fn thread_scope() { a.push(4); assert_eq!(x, a.len()); } + +#[test] +fn thread_messaging() { + use std::sync::mpsc::{channel, Receiver}; + + let (tx, rx) = channel(); + static ATOMIC_COUNT: AtomicUsize = AtomicUsize::new(0); + + fn reader_callback(rx: Receiver) { + while let Ok(_) = rx.recv() { + let old_value = ATOMIC_COUNT.fetch_add(1, Ordering::Relaxed); + if old_value == usize::MAX { + break; + } + thread::sleep(Duration::from_millis(200)); + } + } + + let reader_thread = thread::Builder::new() + .name(String::from("reader")) + .spawn(|| reader_callback(rx)) + .unwrap(); + + for i in 0..1000 { + tx.send(format!("message {}", i)).unwrap(); + } + + let _ = thread::spawn(move || { + drop(tx); + thread::sleep(Duration::from_millis(1100)); + + let value = ATOMIC_COUNT.load(Ordering::Relaxed); + std::assert_eq!(value, 6); + ATOMIC_COUNT.store(usize::MAX, Ordering::Relaxed); + }) + .join() + .unwrap(); + + reader_thread.join().unwrap(); +} + +#[test] +fn thread_no_join() { + use std::sync::{atomic::AtomicBool, mpsc::channel}; + + let (tx, rx) = channel(); + static ATOMIC_STARTED: AtomicBool = AtomicBool::new(false); + + let _ = thread::Builder::new() + .name(String::from("polled")) + .spawn(move || { + ATOMIC_STARTED.store(true, Ordering::Relaxed); + rx.recv().unwrap(); + }) + .unwrap(); + + let _ = thread::Builder::new() + .name(String::from("awaiter")) + .spawn(move || { + thread::sleep(Duration::from_millis(1000)); + let started = ATOMIC_STARTED.load(Ordering::Relaxed); + std::assert_eq!(started, true); + }) + .unwrap() + .join() + .unwrap(); + + tx.send(()).unwrap(); +} diff --git a/tests/wasm.rs b/tests/wasm.rs index 92532ba..ec4684b 100644 --- a/tests/wasm.rs +++ b/tests/wasm.rs @@ -1,7 +1,7 @@ #![cfg(target_arch = "wasm32")] use core::{ - sync::atomic::{AtomicBool, Ordering}, + sync::atomic::{AtomicBool, AtomicUsize, Ordering}, time::Duration, }; @@ -97,3 +97,77 @@ async fn thread_scope_sync_block() { .await .unwrap(); } + +#[wasm_bindgen_test] +async fn thread_messaging() { + use std::{ + sync::mpsc::{channel, Receiver}, + thread as std_thread, + }; + + let (tx, rx) = channel(); + static ATOMIC_COUNT: AtomicUsize = AtomicUsize::new(0); + + fn reader_callback(rx: Receiver) { + while let Ok(_) = rx.recv() { + let old_value = ATOMIC_COUNT.fetch_add(1, Ordering::Relaxed); + if old_value == usize::MAX { + break; + } + + std_thread::sleep(Duration::from_millis(200)); + } + } + + let reader_thread = thread::Builder::new() + .name(String::from("reader")) + .spawn(|| reader_callback(rx)) + .unwrap(); + + for i in 0..1000 { + tx.send(format!("message {}", i)).unwrap(); + } + + let _ = thread::spawn(move || { + std_thread::sleep(Duration::from_millis(1100)); + + let value = ATOMIC_COUNT.load(Ordering::Relaxed); + std::assert_eq!(value, 6); + ATOMIC_COUNT.store(usize::MAX, Ordering::Relaxed); + }) + .join_async() + .await + .unwrap(); + + reader_thread.join_async().await.unwrap(); +} + +#[wasm_bindgen_test] +async fn thread_no_join() { + use std::sync::mpsc::channel; + + let (tx, rx) = channel(); + static ATOMIC_STARTED: AtomicBool = AtomicBool::new(false); + + let _ = thread::Builder::new() + .name(String::from("polled")) + .spawn(move || { + ATOMIC_STARTED.store(true, Ordering::Relaxed); + rx.recv().unwrap(); + }) + .unwrap(); + + let _ = thread::Builder::new() + .name(String::from("awaiter")) + .spawn(move || { + thread::sleep(Duration::from_millis(1000)); + let started = ATOMIC_STARTED.load(Ordering::Relaxed); + std::assert_eq!(started, true); + }) + .unwrap() + .join_async() + .await + .unwrap(); + + tx.send(()).unwrap(); +}