Skip to content

Commit

Permalink
Prepare for web-support for gRPC client
Browse files Browse the repository at this point in the history
  • Loading branch information
emilk committed Oct 29, 2024
1 parent 0e4c7aa commit 39fc299
Show file tree
Hide file tree
Showing 9 changed files with 110 additions and 18 deletions.
34 changes: 31 additions & 3 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -1103,9 +1103,9 @@ dependencies = [

[[package]]
name = "byteorder"
version = "1.4.3"
version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610"
checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b"

[[package]]
name = "bytes"
Expand Down Expand Up @@ -5956,6 +5956,7 @@ dependencies = [
"re_log_types",
"thiserror",
"tonic",
"tonic-web-wasm-client",
]

[[package]]
Expand Down Expand Up @@ -6058,6 +6059,8 @@ dependencies = [
"thiserror",
"tokio",
"tokio-stream",
"tonic-web-wasm-client",
"wasm-bindgen-futures",
]

[[package]]
Expand Down Expand Up @@ -6600,6 +6603,7 @@ dependencies = [
"re_memory",
"re_query",
"re_renderer",
"re_rrdp_comms",
"re_sdk_comms",
"re_selection_panel",
"re_smart_channel",
Expand Down Expand Up @@ -8337,7 +8341,6 @@ dependencies = [
"libc",
"mio 1.0.2",
"pin-project-lite",
"signal-hook-registry",
"socket2 0.5.7",
"tokio-macros",
"windows-sys 0.52.0",
Expand Down Expand Up @@ -8478,6 +8481,31 @@ dependencies = [
"syn 2.0.79",
]

[[package]]
name = "tonic-web-wasm-client"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ef5ca6e7bdd0042c440d36b6df97c1436f1d45871ce18298091f114004b1beb4"
dependencies = [
"base64 0.22.1",
"byteorder",
"bytes",
"futures-util",
"http 1.1.0",
"http-body 1.0.1",
"http-body-util",
"httparse",
"js-sys",
"pin-project",
"thiserror",
"tonic",
"tower-service",
"wasm-bindgen",
"wasm-bindgen-futures",
"wasm-streams",
"web-sys",
]

[[package]]
name = "tower"
version = "0.4.13"
Expand Down
5 changes: 3 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -270,8 +270,9 @@ tobj = "4.0"
tokio = { version = "1.40.0", default-features = false }
tokio-stream = "0.1.16"
toml = { version = "0.8.10", default-features = false }
tonic = "0.12.3"
tonic-build = "0.12.3"
tonic = { version = "0.12.3", default-features = false }
tonic-build = { version = "0.12.3", default-features = false }
tonic-web-wasm-client = "0.6"
tracing = { version = "0.1", default-features = false }
tungstenite = { version = "0.23", default-features = false }
type-map = "0.5"
Expand Down
4 changes: 3 additions & 1 deletion crates/build/re_remote_store_types_builder/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ re_log = { workspace = true, features = ["setup"] }

# External
camino.workspace = true
tonic-build.workspace = true
tonic-build = { workspace = true, default-features = false, features = [
"prost",
] }

[lints]
workspace = true
17 changes: 16 additions & 1 deletion crates/store/re_remote_store_types/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,22 @@ re_dataframe.workspace = true
arrow2 = { workspace = true, features = ["io_ipc"] }
prost.workspace = true
thiserror.workspace = true
tonic.workspace = true

# Native dependencies:
[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
tonic = { workspace = true, defauklt-features = false, features = [
"codegen",
"prost",
"transport",
] }

# Web dependencies:
[target.'cfg(target_arch = "wasm32")'.dependencies]
tonic = { workspace = true, defauklt-features = false, features = [
"codegen",
"prost",
] }
tonic-web-wasm-client.workspace = true

[lints]
workspace = true
15 changes: 12 additions & 3 deletions crates/store/re_rrdp_comms/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,16 @@ re_log.workspace = true
re_remote_store_types.workspace = true
re_smart_channel.workspace = true

anyhow.workspace = true
anyhow.workspace = true # TODO: use thiserror for everything
thiserror.workspace = true
tokio = { workspace = true, features = ["rt-multi-thread", "macros", "signal"] }
tokio-stream = { workspace = true }
tokio-stream.workspace = true

# Native dependencies:
[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
tokio.workspace = true


# Web dependencies:
[target.'cfg(target_arch = "wasm32")'.dependencies]
tonic-web-wasm-client.workspace = true
wasm-bindgen-futures.workspace = true
35 changes: 30 additions & 5 deletions crates/store/re_rrdp_comms/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ use re_remote_store_types::{
storage_node_client::StorageNodeClient, EncoderVersion, FetchRecordingRequest, RecordingId,
},
};
use tokio_stream::StreamExt;

#[derive(thiserror::Error, Debug)]
pub enum Error {
Expand Down Expand Up @@ -94,7 +93,7 @@ pub fn stream_recording(
re_smart_channel::SmartChannelSource::RrdpStream { url: url.clone() },
);

tokio::spawn(async move {
spawn_future(async move {
if let Err(err) = stream_recording_async(tx, address, on_msg).await {
re_log::warn!(
"Failed to fetch whole recording from {url}: {}",
Expand All @@ -106,11 +105,29 @@ pub fn stream_recording(
Ok(rx)
}

#[cfg(target_arch = "wasm32")]
fn spawn_future<F>(future: F)
where
F: std::future::Future<Output = ()> + 'static,
{
wasm_bindgen_futures::spawn_local(future);
}

#[cfg(not(target_arch = "wasm32"))]
fn spawn_future<F>(future: F)
where
F: std::future::Future<Output = ()> + 'static + Send,
{
tokio::spawn(future);
}

async fn stream_recording_async(
tx: re_smart_channel::Sender<LogMsg>,
address: Address,
on_msg: Option<Box<dyn Fn() + Send + Sync>>,
) -> anyhow::Result<()> {
use tokio_stream::StreamExt as _;

let Address {
addr_port,
recording_id,
Expand All @@ -119,9 +136,17 @@ async fn stream_recording_async(
let http_addr = format!("http://{addr_port}");
re_log::debug!("Connecting to {http_addr}…");

let mut client = StorageNodeClient::connect(http_addr)
.await?
.max_decoding_message_size(1024 * 1024 * 1024);
#[cfg(target_arch = "wasm32")]
let mut client = StorageNodeClient::new(tonic_web_wasm_client::Client::new_with_options(
http_addr,
tonic_web_wasm_client::options::FetchOptions::new()
.mode(tonic_web_wasm_client::options::Mode::Cors), // TODO: is this needed?
));

#[cfg(not(target_arch = "wasm32"))]
let mut client = StorageNodeClient::connect(http_addr).await?;

client = client.max_decoding_message_size(1024 * 1024 * 1024);

re_log::debug!("Fetching {recording_id}…");

Expand Down
5 changes: 4 additions & 1 deletion crates/top/rerun-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,10 @@ rerun = { workspace = true, features = [
document-features.workspace = true
mimalloc = "0.1.43"

tokio = { workspace = true, optional = true, features = ["macros"] }
tokio = { workspace = true, optional = true, features = [
"macros",
"rt-multi-thread",
] }


[build-dependencies]
Expand Down
5 changes: 3 additions & 2 deletions crates/viewer/re_viewer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ crate-type = ["cdylib", "rlib"]


[features]
default = ["analytics"]
default = ["analytics", "rrdp"] # TODO: not a default feature?

## Enable telemetry using our analytics SDK.
analytics = ["dep:re_analytics"]
Expand All @@ -40,7 +40,7 @@ analytics = ["dep:re_analytics"]
map_view = ["dep:re_space_view_map"]

## Enable the rrdp data source.
rrdp = ["re_data_source/rrdp"]
rrdp = ["re_data_source/rrdp", "dep:re_rrdp_comms"]


[dependencies]
Expand Down Expand Up @@ -91,6 +91,7 @@ re_ws_comms = { workspace = true, features = ["client"] }

# Internal (optional):
re_analytics = { workspace = true, optional = true }
re_rrdp_comms = { workspace = true, optional = true }
re_space_view_map = { workspace = true, optional = true }


Expand Down
8 changes: 8 additions & 0 deletions crates/viewer/re_viewer/src/web_tools.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ enum EndpointCategory {
/// Could be a link to either an `.rrd` recording or a `.rbl` blueprint.
HttpRrd(String),

/// gRPC Rerun DataPlatform URL, e.g. `rrdp://ip:port/recording/1234`
DataPlatform(String),

/// A remote Rerun server.
WebSocket(String),

Expand All @@ -96,6 +99,8 @@ impl EndpointCategory {
fn categorize_uri(uri: String) -> Self {
if uri.starts_with("http") || uri.ends_with(".rrd") || uri.ends_with(".rbl") {
Self::HttpRrd(uri)
} else if uri.starts_with("rrdp://") {
Self::DataPlatform(uri)
} else if uri.starts_with("ws:") || uri.starts_with("wss:") {
Self::WebSocket(uri)
} else if uri.starts_with("web_event:") {
Expand Down Expand Up @@ -132,6 +137,9 @@ pub fn url_to_receiver(
Some(ui_waker),
),
),
EndpointCategory::DataPlatform(url) => {
re_rrdp_comms::stream_recording(url, Some(ui_waker)).map_err(|err| err.into())
}
EndpointCategory::WebEventListener(url) => {
// Process an rrd when it's posted via `window.postMessage`
let (tx, rx) = re_smart_channel::smart_channel(
Expand Down

0 comments on commit 39fc299

Please sign in to comment.