From 39fc299db034ee00a44f95671f0ca8ff6e614ddb Mon Sep 17 00:00:00 2001 From: Emil Ernerfeldt Date: Tue, 29 Oct 2024 19:13:13 +0100 Subject: [PATCH] Prepare for web-support for gRPC client --- Cargo.lock | 34 ++++++++++++++++-- Cargo.toml | 5 +-- .../re_remote_store_types_builder/Cargo.toml | 4 ++- crates/store/re_remote_store_types/Cargo.toml | 17 ++++++++- crates/store/re_rrdp_comms/Cargo.toml | 15 ++++++-- crates/store/re_rrdp_comms/src/lib.rs | 35 ++++++++++++++++--- crates/top/rerun-cli/Cargo.toml | 5 ++- crates/viewer/re_viewer/Cargo.toml | 5 +-- crates/viewer/re_viewer/src/web_tools.rs | 8 +++++ 9 files changed, 110 insertions(+), 18 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7a7744331bd0..c03658ed49d6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1103,9 +1103,9 @@ dependencies = [ [[package]] name = "byteorder" -version = "1.4.3" +version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610" +checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" [[package]] name = "bytes" @@ -5956,6 +5956,7 @@ dependencies = [ "re_log_types", "thiserror", "tonic", + "tonic-web-wasm-client", ] [[package]] @@ -6058,6 +6059,8 @@ dependencies = [ "thiserror", "tokio", "tokio-stream", + "tonic-web-wasm-client", + "wasm-bindgen-futures", ] [[package]] @@ -6600,6 +6603,7 @@ dependencies = [ "re_memory", "re_query", "re_renderer", + "re_rrdp_comms", "re_sdk_comms", "re_selection_panel", "re_smart_channel", @@ -8337,7 +8341,6 @@ dependencies = [ "libc", "mio 1.0.2", "pin-project-lite", - "signal-hook-registry", "socket2 0.5.7", "tokio-macros", "windows-sys 0.52.0", @@ -8478,6 +8481,31 @@ dependencies = [ "syn 2.0.79", ] +[[package]] +name = "tonic-web-wasm-client" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ef5ca6e7bdd0042c440d36b6df97c1436f1d45871ce18298091f114004b1beb4" +dependencies = [ + "base64 0.22.1", + "byteorder", + "bytes", + "futures-util", + "http 1.1.0", + "http-body 1.0.1", + "http-body-util", + "httparse", + "js-sys", + "pin-project", + "thiserror", + "tonic", + "tower-service", + "wasm-bindgen", + "wasm-bindgen-futures", + "wasm-streams", + "web-sys", +] + [[package]] name = "tower" version = "0.4.13" diff --git a/Cargo.toml b/Cargo.toml index 70acb9f1383c..cb65be4d838f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -270,8 +270,9 @@ tobj = "4.0" tokio = { version = "1.40.0", default-features = false } tokio-stream = "0.1.16" toml = { version = "0.8.10", default-features = false } -tonic = "0.12.3" -tonic-build = "0.12.3" +tonic = { version = "0.12.3", default-features = false } +tonic-build = { version = "0.12.3", default-features = false } +tonic-web-wasm-client = "0.6" tracing = { version = "0.1", default-features = false } tungstenite = { version = "0.23", default-features = false } type-map = "0.5" diff --git a/crates/build/re_remote_store_types_builder/Cargo.toml b/crates/build/re_remote_store_types_builder/Cargo.toml index f104efa1caa0..fa4435da3666 100644 --- a/crates/build/re_remote_store_types_builder/Cargo.toml +++ b/crates/build/re_remote_store_types_builder/Cargo.toml @@ -16,7 +16,9 @@ re_log = { workspace = true, features = ["setup"] } # External camino.workspace = true -tonic-build.workspace = true +tonic-build = { workspace = true, default-features = false, features = [ + "prost", +] } [lints] workspace = true diff --git a/crates/store/re_remote_store_types/Cargo.toml b/crates/store/re_remote_store_types/Cargo.toml index b342fa235da9..d1ad181405ba 100644 --- a/crates/store/re_remote_store_types/Cargo.toml +++ b/crates/store/re_remote_store_types/Cargo.toml @@ -17,7 +17,22 @@ re_dataframe.workspace = true arrow2 = { workspace = true, features = ["io_ipc"] } prost.workspace = true thiserror.workspace = true -tonic.workspace = true + +# Native dependencies: +[target.'cfg(not(target_arch = "wasm32"))'.dependencies] +tonic = { workspace = true, defauklt-features = false, features = [ + "codegen", + "prost", + "transport", +] } + +# Web dependencies: +[target.'cfg(target_arch = "wasm32")'.dependencies] +tonic = { workspace = true, defauklt-features = false, features = [ + "codegen", + "prost", +] } +tonic-web-wasm-client.workspace = true [lints] workspace = true diff --git a/crates/store/re_rrdp_comms/Cargo.toml b/crates/store/re_rrdp_comms/Cargo.toml index 4b5546784e8b..6a9c1e48349e 100644 --- a/crates/store/re_rrdp_comms/Cargo.toml +++ b/crates/store/re_rrdp_comms/Cargo.toml @@ -27,7 +27,16 @@ re_log.workspace = true re_remote_store_types.workspace = true re_smart_channel.workspace = true -anyhow.workspace = true +anyhow.workspace = true # TODO: use thiserror for everything thiserror.workspace = true -tokio = { workspace = true, features = ["rt-multi-thread", "macros", "signal"] } -tokio-stream = { workspace = true } +tokio-stream.workspace = true + +# Native dependencies: +[target.'cfg(not(target_arch = "wasm32"))'.dependencies] +tokio.workspace = true + + +# Web dependencies: +[target.'cfg(target_arch = "wasm32")'.dependencies] +tonic-web-wasm-client.workspace = true +wasm-bindgen-futures.workspace = true diff --git a/crates/store/re_rrdp_comms/src/lib.rs b/crates/store/re_rrdp_comms/src/lib.rs index 454ef922ea3d..be4cd6d54d00 100644 --- a/crates/store/re_rrdp_comms/src/lib.rs +++ b/crates/store/re_rrdp_comms/src/lib.rs @@ -13,7 +13,6 @@ use re_remote_store_types::{ storage_node_client::StorageNodeClient, EncoderVersion, FetchRecordingRequest, RecordingId, }, }; -use tokio_stream::StreamExt; #[derive(thiserror::Error, Debug)] pub enum Error { @@ -94,7 +93,7 @@ pub fn stream_recording( re_smart_channel::SmartChannelSource::RrdpStream { url: url.clone() }, ); - tokio::spawn(async move { + spawn_future(async move { if let Err(err) = stream_recording_async(tx, address, on_msg).await { re_log::warn!( "Failed to fetch whole recording from {url}: {}", @@ -106,11 +105,29 @@ pub fn stream_recording( Ok(rx) } +#[cfg(target_arch = "wasm32")] +fn spawn_future(future: F) +where + F: std::future::Future + 'static, +{ + wasm_bindgen_futures::spawn_local(future); +} + +#[cfg(not(target_arch = "wasm32"))] +fn spawn_future(future: F) +where + F: std::future::Future + 'static + Send, +{ + tokio::spawn(future); +} + async fn stream_recording_async( tx: re_smart_channel::Sender, address: Address, on_msg: Option>, ) -> anyhow::Result<()> { + use tokio_stream::StreamExt as _; + let Address { addr_port, recording_id, @@ -119,9 +136,17 @@ async fn stream_recording_async( let http_addr = format!("http://{addr_port}"); re_log::debug!("Connecting to {http_addr}…"); - let mut client = StorageNodeClient::connect(http_addr) - .await? - .max_decoding_message_size(1024 * 1024 * 1024); + #[cfg(target_arch = "wasm32")] + let mut client = StorageNodeClient::new(tonic_web_wasm_client::Client::new_with_options( + http_addr, + tonic_web_wasm_client::options::FetchOptions::new() + .mode(tonic_web_wasm_client::options::Mode::Cors), // TODO: is this needed? + )); + + #[cfg(not(target_arch = "wasm32"))] + let mut client = StorageNodeClient::connect(http_addr).await?; + + client = client.max_decoding_message_size(1024 * 1024 * 1024); re_log::debug!("Fetching {recording_id}…"); diff --git a/crates/top/rerun-cli/Cargo.toml b/crates/top/rerun-cli/Cargo.toml index 242a77c7a59c..d3a1c3f56bc5 100644 --- a/crates/top/rerun-cli/Cargo.toml +++ b/crates/top/rerun-cli/Cargo.toml @@ -88,7 +88,10 @@ rerun = { workspace = true, features = [ document-features.workspace = true mimalloc = "0.1.43" -tokio = { workspace = true, optional = true, features = ["macros"] } +tokio = { workspace = true, optional = true, features = [ + "macros", + "rt-multi-thread", +] } [build-dependencies] diff --git a/crates/viewer/re_viewer/Cargo.toml b/crates/viewer/re_viewer/Cargo.toml index 9ec74a11305b..b26db53ac94c 100644 --- a/crates/viewer/re_viewer/Cargo.toml +++ b/crates/viewer/re_viewer/Cargo.toml @@ -31,7 +31,7 @@ crate-type = ["cdylib", "rlib"] [features] -default = ["analytics"] +default = ["analytics", "rrdp"] # TODO: not a default feature? ## Enable telemetry using our analytics SDK. analytics = ["dep:re_analytics"] @@ -40,7 +40,7 @@ analytics = ["dep:re_analytics"] map_view = ["dep:re_space_view_map"] ## Enable the rrdp data source. -rrdp = ["re_data_source/rrdp"] +rrdp = ["re_data_source/rrdp", "dep:re_rrdp_comms"] [dependencies] @@ -91,6 +91,7 @@ re_ws_comms = { workspace = true, features = ["client"] } # Internal (optional): re_analytics = { workspace = true, optional = true } +re_rrdp_comms = { workspace = true, optional = true } re_space_view_map = { workspace = true, optional = true } diff --git a/crates/viewer/re_viewer/src/web_tools.rs b/crates/viewer/re_viewer/src/web_tools.rs index 160a3778b26e..43d1c4cea4fb 100644 --- a/crates/viewer/re_viewer/src/web_tools.rs +++ b/crates/viewer/re_viewer/src/web_tools.rs @@ -85,6 +85,9 @@ enum EndpointCategory { /// Could be a link to either an `.rrd` recording or a `.rbl` blueprint. HttpRrd(String), + /// gRPC Rerun DataPlatform URL, e.g. `rrdp://ip:port/recording/1234` + DataPlatform(String), + /// A remote Rerun server. WebSocket(String), @@ -96,6 +99,8 @@ impl EndpointCategory { fn categorize_uri(uri: String) -> Self { if uri.starts_with("http") || uri.ends_with(".rrd") || uri.ends_with(".rbl") { Self::HttpRrd(uri) + } else if uri.starts_with("rrdp://") { + Self::DataPlatform(uri) } else if uri.starts_with("ws:") || uri.starts_with("wss:") { Self::WebSocket(uri) } else if uri.starts_with("web_event:") { @@ -132,6 +137,9 @@ pub fn url_to_receiver( Some(ui_waker), ), ), + EndpointCategory::DataPlatform(url) => { + re_rrdp_comms::stream_recording(url, Some(ui_waker)).map_err(|err| err.into()) + } EndpointCategory::WebEventListener(url) => { // Process an rrd when it's posted via `window.postMessage` let (tx, rx) = re_smart_channel::smart_channel(