Skip to content

Commit ee7f208

Browse files
committed
Refactor the relationship between the servers and handles
1 parent 31237fe commit ee7f208

File tree

16 files changed

+225
-130
lines changed

16 files changed

+225
-130
lines changed

Cargo.lock

Lines changed: 3 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/re_viewer/src/web.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,10 @@ fn get_url(info: &eframe::IntegrationInfo) -> String {
148148
url = param.clone();
149149
}
150150
if url.is_empty() {
151-
re_ws_comms::default_server_url(&info.web_info.location.hostname)
151+
re_ws_comms::server_url(
152+
&info.web_info.location.hostname,
153+
re_ws_comms::DEFAULT_WS_SERVER_PORT,
154+
)
152155
} else {
153156
url
154157
}

crates/re_web_viewer_server/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ ctrlc.workspace = true
5151
document-features = "0.2"
5252
futures-util = "0.3"
5353
hyper = { version = "0.14", features = ["full"] }
54+
thiserror.workspace = true
5455
tokio = { workspace = true, default-features = false, features = [
5556
"macros",
5657
"rt-multi-thread",

crates/re_web_viewer_server/src/lib.rs

Lines changed: 66 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ use std::task::{Context, Poll};
1212
use futures_util::future;
1313
use hyper::{server::conn::AddrIncoming, service::Service, Body, Request, Response};
1414

15+
pub const DEFAULT_WEB_VIEWER_PORT: u16 = 9090;
16+
1517
#[cfg(not(feature = "__ci"))]
1618
mod data {
1719
// If you add/remove/change the paths here, also update the include-list in `Cargo.toml`!
@@ -32,6 +34,18 @@ mod data {
3234
pub const VIEWER_WASM_RELEASE: &[u8] = include_bytes!("../web_viewer/re_viewer_bg.wasm");
3335
}
3436

37+
#[derive(thiserror::Error, Debug)]
38+
pub enum WebViewerServerError {
39+
#[error("Could not parse address: {0}")]
40+
AddrParseFailed(#[from] std::net::AddrParseError),
41+
#[error("failed to bind to port {0}: {1}")]
42+
BindFailed(u16, hyper::Error),
43+
#[error("failed to join web viewer server task: {0}")]
44+
JoinError(#[from] tokio::task::JoinError),
45+
#[error("failed to serve web viewer: {0}")]
46+
ServeFailed(hyper::Error),
47+
}
48+
3549
struct Svc {
3650
// NOTE: Optional because it is possible to have the `analytics` feature flag enabled
3751
// while at the same time opting-out of analytics at run-time.
@@ -149,27 +163,72 @@ impl<T> Service<T> for MakeSvc {
149163

150164
// ----------------------------------------------------------------------------
151165

152-
/// Hosts the Web Viewer Wasm+HTML
166+
/// HTTP host for the Rerun Web Viewer application
167+
/// This serves the HTTP+WASM+JS files that make up the web-viewer.
153168
pub struct WebViewerServer {
154169
server: hyper::Server<AddrIncoming, MakeSvc>,
155170
}
156171

157172
impl WebViewerServer {
158-
pub fn new(port: u16) -> Self {
159-
let bind_addr = format!("0.0.0.0:{port}").parse().unwrap();
160-
let server = hyper::Server::bind(&bind_addr).serve(MakeSvc);
161-
Self { server }
173+
pub fn new(port: u16) -> Result<Self, WebViewerServerError> {
174+
let bind_addr = format!("0.0.0.0:{port}").parse()?;
175+
let server = hyper::Server::try_bind(&bind_addr)
176+
.map_err(|e| WebViewerServerError::BindFailed(port, e))?
177+
.serve(MakeSvc);
178+
Ok(Self { server })
162179
}
163180

164181
pub async fn serve(
165182
self,
166183
mut shutdown_rx: tokio::sync::broadcast::Receiver<()>,
167-
) -> anyhow::Result<()> {
184+
) -> Result<(), WebViewerServerError> {
168185
self.server
169186
.with_graceful_shutdown(async {
170187
shutdown_rx.recv().await.ok();
171188
})
172-
.await?;
189+
.await
190+
.map_err(WebViewerServerError::ServeFailed)?;
173191
Ok(())
174192
}
175193
}
194+
195+
/// Sync handle for the [`WebViewerServer`]
196+
///
197+
/// When dropped, the server will be shut down.
198+
pub struct WebViewerServerHandle {
199+
port: u16,
200+
shutdown_tx: tokio::sync::broadcast::Sender<()>,
201+
}
202+
203+
impl Drop for WebViewerServerHandle {
204+
fn drop(&mut self) {
205+
re_log::info!("Shutting down web server on port {}.", self.port);
206+
self.shutdown_tx.send(()).ok();
207+
}
208+
}
209+
210+
impl WebViewerServerHandle {
211+
/// Create new [`WebViewerServer`] to host the Rerun Web Viewer on a specified port.
212+
///
213+
/// A port of 0 will let the OS choose a free port.
214+
///
215+
/// The caller needs to ensure that there is a `tokio` runtime running.
216+
pub fn new(requested_port: u16) -> Result<Self, WebViewerServerError> {
217+
let (shutdown_tx, shutdown_rx) = tokio::sync::broadcast::channel(1);
218+
219+
let web_server = WebViewerServer::new(requested_port)?;
220+
221+
let port = web_server.server.local_addr().port();
222+
223+
tokio::spawn(async move { web_server.serve(shutdown_rx).await });
224+
225+
re_log::info!("Started web server on port {}.", port);
226+
227+
Ok(Self { port, shutdown_tx })
228+
}
229+
230+
/// Get the port where the web assets are hosted
231+
pub fn port(&self) -> u16 {
232+
self.port
233+
}
234+
}

crates/re_web_viewer_server/src/main.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ async fn main() {
1616
.expect("Error setting Ctrl-C handler");
1717

1818
re_web_viewer_server::WebViewerServer::new(port)
19+
.expect("Could not create web server")
1920
.serve(shutdown_rx)
2021
.await
2122
.unwrap();

crates/re_ws_comms/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ futures-util = { version = "0.3", optional = true, default-features = false, fea
5959
"std",
6060
] }
6161
parking_lot = { workspace = true, optional = true }
62+
thiserror.workspace = true
6263
tokio-tungstenite = { version = "0.17.1", optional = true }
6364
tokio = { workspace = true, optional = true, default-features = false, features = [
6465
"io-std",

crates/re_ws_comms/src/lib.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ pub use client::Connection;
1212
#[cfg(feature = "server")]
1313
mod server;
1414
#[cfg(feature = "server")]
15-
pub use server::Server;
15+
pub use server::{RerunServer, RerunServerHandle};
1616

1717
use re_log_types::LogMsg;
1818

@@ -26,8 +26,8 @@ pub const PROTOCOL: &str = "wss";
2626
#[cfg(not(feature = "tls"))]
2727
pub const PROTOCOL: &str = "ws";
2828

29-
pub fn default_server_url(hostname: &str) -> String {
30-
format!("{PROTOCOL}://{hostname}:{DEFAULT_WS_SERVER_PORT}")
29+
pub fn server_url(hostname: &str, port: u16) -> String {
30+
format!("{PROTOCOL}://{hostname}:{port}")
3131
}
3232

3333
const PREFIX: [u8; 4] = *b"RR00";

crates/re_ws_comms/src/server.rs

Lines changed: 63 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -16,22 +16,32 @@ use tokio_tungstenite::{accept_async, tungstenite::Error};
1616
use re_log_types::LogMsg;
1717
use re_smart_channel::Receiver;
1818

19+
use crate::server_url;
20+
1921
// ----------------------------------------------------------------------------
2022

21-
pub struct Server {
23+
#[derive(thiserror::Error, Debug)]
24+
pub enum RerunServerError {
25+
#[error("failed to bind to port {0}: {1}")]
26+
BindFailed(u16, std::io::Error),
27+
#[error("failed to join web viewer server task: {0}")]
28+
JoinError(#[from] tokio::task::JoinError),
29+
#[error("tokio error: {0}")]
30+
TokioIoError(#[from] tokio::io::Error),
31+
}
32+
33+
pub struct RerunServer {
2234
listener: TcpListener,
2335
}
2436

25-
impl Server {
37+
impl RerunServer {
2638
/// Start a pub-sub server listening on the given port
27-
pub async fn new(port: u16) -> anyhow::Result<Self> {
28-
use anyhow::Context as _;
29-
39+
pub async fn new(port: u16) -> Result<Self, RerunServerError> {
3040
let bind_addr = format!("0.0.0.0:{port}");
3141

3242
let listener = TcpListener::bind(&bind_addr)
3343
.await
34-
.with_context(|| format!("Can't listen on {bind_addr:?}"))?;
44+
.map_err(|e| RerunServerError::BindError(port, e))?;
3545

3646
re_log::info!(
3747
"Listening for websocket traffic on {bind_addr}. Connect with a web Rerun Viewer."
@@ -45,9 +55,7 @@ impl Server {
4555
self,
4656
rx: Receiver<LogMsg>,
4757
mut shutdown_rx: tokio::sync::broadcast::Receiver<()>,
48-
) -> anyhow::Result<()> {
49-
use anyhow::Context as _;
50-
58+
) -> Result<(), RerunServerError> {
5159
let history = Arc::new(Mutex::new(Vec::new()));
5260

5361
let log_stream = to_broadcast_stream(rx, history.clone());
@@ -60,9 +68,7 @@ impl Server {
6068
}
6169
};
6270

63-
let peer = tcp_stream
64-
.peer_addr()
65-
.context("connected streams should have a peer address")?;
71+
let peer = tcp_stream.peer_addr()?;
6672
tokio::spawn(accept_connection(
6773
log_stream.clone(),
6874
peer,
@@ -73,6 +79,51 @@ impl Server {
7379
}
7480
}
7581

82+
pub struct RerunServerHandle {
83+
port: u16,
84+
shutdown_tx: tokio::sync::broadcast::Sender<()>,
85+
}
86+
87+
impl Drop for RerunServerHandle {
88+
fn drop(&mut self) {
89+
re_log::info!("Shutting down Rerun server on port {}.", self.port);
90+
self.shutdown_tx.send(()).ok();
91+
}
92+
}
93+
94+
impl RerunServerHandle {
95+
/// Create new [`RerunServer`] to relay [`LogMsg`]s to a web viewer.
96+
///
97+
/// A port of 0 will let the OS choose a free port.
98+
///
99+
/// The caller needs to ensure that there is a `tokio` runtime running.
100+
pub fn new(rerun_rx: Receiver<LogMsg>, requested_port: u16) -> Result<Self, RerunServerError> {
101+
let (shutdown_tx, shutdown_rx) = tokio::sync::broadcast::channel(1);
102+
103+
let rt = tokio::runtime::Handle::current();
104+
105+
let ws_server = rt.block_on(tokio::spawn(async move {
106+
let ws_server = RerunServer::new(requested_port).await;
107+
ws_server
108+
}))??;
109+
110+
let port = ws_server.listener.local_addr()?.port();
111+
112+
tokio::spawn(async move { ws_server.listen(rerun_rx, shutdown_rx).await });
113+
114+
Ok(Self { port, shutdown_tx })
115+
}
116+
117+
/// Get the port where the web assets are hosted
118+
pub fn port(&self) -> u16 {
119+
self.port
120+
}
121+
122+
pub fn server_url(&self) -> String {
123+
server_url("localhost", self.port)
124+
}
125+
}
126+
76127
fn to_broadcast_stream(
77128
log_rx: Receiver<LogMsg>,
78129
history: Arc<Mutex<Vec<Arc<[u8]>>>>,

crates/rerun/src/clap.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ impl RerunArgs {
103103
#[cfg(feature = "web_viewer")]
104104
RerunBehavior::Serve => {
105105
let open_browser = true;
106-
crate::web_viewer::new_sink(open_browser)
106+
crate::web_viewer::new_sink(open_browser)?
107107
}
108108

109109
#[cfg(feature = "native_viewer")]

crates/rerun/src/run.rs

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -303,8 +303,15 @@ async fn run_impl(
303303
if args.web_viewer {
304304
#[cfg(feature = "web_viewer")]
305305
{
306-
let web_viewer =
307-
host_web_viewer(true, rerun_server_ws_url, shutdown_rx.resubscribe());
306+
let web_port = re_web_viewer_server::DEFAULT_WEB_VIEWER_PORT;
307+
let web_viewer = host_web_viewer(
308+
web_port,
309+
true,
310+
rerun_server_ws_url,
311+
shutdown_rx.resubscribe(),
312+
);
313+
// We return here because the running [`WebViewerServer`] is all we need.
314+
// The page we open will be pointed at a websocket url hosted by a *different* server.
308315
return web_viewer.await;
309316
}
310317
#[cfg(not(feature = "web_viewer"))]
@@ -369,17 +376,23 @@ async fn run_impl(
369376
let shutdown_web_viewer = shutdown_rx.resubscribe();
370377

371378
// This is the server which the web viewer will talk to:
372-
let ws_server = re_ws_comms::Server::new(re_ws_comms::DEFAULT_WS_SERVER_PORT).await?;
379+
let ws_server =
380+
re_ws_comms::RerunServer::new(re_ws_comms::DEFAULT_WS_SERVER_PORT).await?;
373381
let ws_server_handle = tokio::spawn(ws_server.listen(rx, shutdown_ws_server));
374-
let ws_server_url = re_ws_comms::default_server_url("127.0.0.1");
382+
let ws_server_url =
383+
re_ws_comms::server_url("127.0.0.1", re_ws_comms::DEFAULT_WS_SERVER_PORT);
375384

376385
// This is the server that serves the Wasm+HTML:
377-
let web_server_handle =
378-
tokio::spawn(host_web_viewer(true, ws_server_url, shutdown_web_viewer));
386+
let web_server_handle = tokio::spawn(host_web_viewer(
387+
re_web_viewer_server::DEFAULT_WEB_VIEWER_PORT,
388+
true,
389+
ws_server_url,
390+
shutdown_web_viewer,
391+
));
379392

380393
// Wait for both servers to shutdown.
381394
web_server_handle.await?.ok();
382-
return ws_server_handle.await?;
395+
return ws_server_handle.await?.map_err(anyhow::Error::from);
383396
}
384397

385398
#[cfg(not(feature = "web_viewer"))]

0 commit comments

Comments
 (0)