Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,11 @@ The `l4book` subscription first sends a snapshot of the entire book and then for

```bash
cargo run --release --bin websocket_server -- --address 0.0.0.0 --port 8000
# With custom inactivity timeout (e.g., 30s):
cargo run --release --bin websocket_server -- --address 0.0.0.0 --port 8000 --inactivity-exit-secs 30
```

If this local server does not detect the node writing down any new events, it will automatically exit after some amount of time (currently set to 5 seconds).
If this local server does not detect the node writing down any new events, it will automatically exit after some amount of time (default 5 seconds; configurable via `--inactivity-exit-secs <secs>`).
In addition, the local server periodically fetches order book snapshots from the node, and compares to its own internal state. If a difference is detected, it will exit.

If you want logging, prepend the command with `RUST_LOG=info`.
Expand Down
9 changes: 8 additions & 1 deletion binaries/src/bin/websocket_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,12 @@ struct Args {
/// documentation for <https://docs.rs/flate2/1.1.2/flate2/struct.Compression.html#method.new> for more info.
#[arg(long)]
websocket_compression_level: Option<u32>,

/// Inactivity timeout in seconds before server exits.
/// If no node events are observed for this duration, the process exits.
/// Default is 5 seconds to match README behavior.
#[arg(long)]
inactivity_exit_secs: Option<u64>,
}

#[tokio::main]
Expand All @@ -37,7 +43,8 @@ async fn main() -> Result<()> {
println!("Running websocket server on {full_address}");

let compression_level = args.websocket_compression_level.unwrap_or(/* Some compression */ 1);
run_websocket_server(&full_address, true, compression_level).await?;
let inactivity_exit_secs = args.inactivity_exit_secs.unwrap_or(5);
run_websocket_server(&full_address, true, compression_level, inactivity_exit_secs).await?;

Ok(())
}
4 changes: 2 additions & 2 deletions server/src/listeners/order_book/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ mod utils;

// WARNING - this code assumes no other file system operations are occurring in the watched directories
// if there are scripts running, this may not work as intended
pub(crate) async fn hl_listen(listener: Arc<Mutex<OrderBookListener>>, dir: PathBuf) -> Result<()> {
pub(crate) async fn hl_listen(listener: Arc<Mutex<OrderBookListener>>, dir: PathBuf, inactivity_exit_secs: u64) -> Result<()> {
let order_statuses_dir = EventSource::OrderStatuses.event_source_dir(&dir).canonicalize()?;
let fills_dir = EventSource::Fills.event_source_dir(&dir).canonicalize()?;
let order_diffs_dir = EventSource::OrderDiffs.event_source_dir(&dir).canonicalize()?;
Expand Down Expand Up @@ -122,7 +122,7 @@ pub(crate) async fn hl_listen(listener: Arc<Mutex<OrderBookListener>>, dir: Path
let snapshot_fetch_task_tx = snapshot_fetch_task_tx.clone();
fetch_snapshot(dir.clone(), listener, snapshot_fetch_task_tx, ignore_spot);
}
() = sleep(Duration::from_secs(5)) => {
() = sleep(Duration::from_secs(inactivity_exit_secs)) => {
let listener = listener.lock().await;
if listener.is_ready() {
return Err(format!("Stream has fallen behind ({HL_NODE} failed?)").into());
Expand Down
4 changes: 2 additions & 2 deletions server/src/servers/websocket_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use tokio::{
};
use yawc::{FrameView, OpCode, WebSocket};

pub async fn run_websocket_server(address: &str, ignore_spot: bool, compression_level: u32) -> Result<()> {
pub async fn run_websocket_server(address: &str, ignore_spot: bool, compression_level: u32, inactivity_exit_secs: u64) -> Result<()> {
let (internal_message_tx, _) = channel::<Arc<InternalMessage>>(100);

// Central task: listen to messages and forward them for distribution
Expand All @@ -42,7 +42,7 @@ pub async fn run_websocket_server(address: &str, ignore_spot: bool, compression_
{
let listener = listener.clone();
tokio::spawn(async move {
if let Err(err) = hl_listen(listener, home_dir).await {
if let Err(err) = hl_listen(listener, home_dir, inactivity_exit_secs).await {
error!("Listener fatal error: {err}");
std::process::exit(1);
}
Expand Down