Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 12 additions & 10 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,16 @@
resolver = "2"

members = [
"hypersync-client",
"hypersync-format",
"hypersync-net-types",
"hypersync-schema",
"examples/all_erc20",
"examples/wallet",
"examples/watch",
"examples/reverse_wallet",
"examples/call_watch",
"examples/call_decode_output",
"hypersync-client",
"hypersync-format",
"hypersync-net-types",
"hypersync-schema",
"examples/all_erc20",
"examples/wallet",
"examples/watch",
"examples/reverse_wallet",
"examples/call_watch",
"examples/call_decode_output",
"examples/height_stream",
"examples/height_stream_reconnect",
]
10 changes: 10 additions & 0 deletions examples/height_stream/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
[package]
name = "height_stream"
version = "0.1.0"
edition = "2021"

[dependencies]
hypersync-client = { path = "../../hypersync-client" }
tokio = { version = "1", features = ["full"] }
env_logger = "0.11"
anyhow = "1"
34 changes: 34 additions & 0 deletions examples/height_stream/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
use std::sync::Arc;

use anyhow::Result;
use hypersync_client::{Client, ClientConfig};

#[tokio::main]
async fn main() -> Result<()> {
env_logger::init();

let client = Arc::new(Client::new(ClientConfig {
url: Some(
"https://arbitrum-sepolia.zone1.hypersync.xyz"
.parse()
.unwrap(),
),
..Default::default()
})?);

let mut rx = client.clone().stream_height().await?;

println!("listening for height updates... (Ctrl+C to quit)");

while let Some(msg) = rx.recv().await {
match msg {
Ok(height) => println!("height: {}", height),
Err(e) => {
eprintln!("stream error - will automatically reconnect: {e:?}");
break;
}
}
}

Ok(())
}
2 changes: 1 addition & 1 deletion hypersync-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ hypersync-schema = { path = "../hypersync-schema", version = "0.3" }
[dependencies.reqwest]
version = "0.12"
default-features = false
features = ["json", "rustls-tls"]
features = ["json", "rustls-tls", "stream"]

[dev-dependencies]
maplit = "1"
Expand Down
271 changes: 271 additions & 0 deletions hypersync-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,12 @@
use std::{num::NonZeroU64, sync::Arc, time::Duration};

use anyhow::{anyhow, Context, Result};
use futures::StreamExt;
use hypersync_net_types::{ArchiveHeight, ChainId, Query};
use polars_arrow::{array::Array, record_batch::RecordBatchT as Chunk};
use reqwest::header;
use reqwest::Method;
use serde::Deserialize;

mod column_mapping;
mod config;
Expand Down Expand Up @@ -542,6 +545,274 @@ impl Client {
}
}

#[derive(Debug, Deserialize)]
struct HeightSsePayloadJson {
height: Option<u64>,
}

const INITIAL_RECONNECT_DELAY: std::time::Duration = std::time::Duration::from_millis(200);
const MAX_RECONNECT_DELAY: std::time::Duration = std::time::Duration::from_secs(30);
const MAX_CONNECTION_AGE: std::time::Duration = std::time::Duration::from_secs(24 * 60 * 60);

impl Client {
/// Streams latest archive height updates from the server using the `/height/sse` SSE endpoint.
///
/// # Overview
/// This function establishes a long-lived Server-Sent Events (SSE) connection to continuously
/// receive height updates from the hypersync server. The connection is resilient and will
/// automatically reconnect if it drops due to network issues, server restarts, or shutdowns.
///
/// # Returns
/// Returns a channel receiver that yields `Result<u64>` heights. A background task manages
/// the connection lifecycle and sends height updates through this channel.
///
/// # Connection Management
/// - **Automatic Reconnection**: If the connection drops, the client automatically attempts
/// to reconnect with exponential backoff (1s → 2s → 4s → ... → max 30s)
/// - **Graceful Shutdown**: When the server closes the stream (e.g., during restart), the
/// client detects it and reconnects immediately
/// - **Error Handling**: Connection errors are logged and don't terminate the stream
///
/// # SSE Protocol Details
/// The function parses SSE messages according to the W3C EventSource spec:
/// - Messages are separated by blank lines (`\n\n`)
/// - Each message can have `event:` and `data:` fields
/// - Keep-alive comments (`:ping`) are ignored
/// - Only `event:height` messages are processed
///
/// # Example
/// ```no_run
/// # use std::sync::Arc;
/// # use hypersync_client::{Client, ClientConfig};
/// # async fn example() -> anyhow::Result<()> {
/// let client = Arc::new(Client::new(ClientConfig {
/// url: Some("https://eth.hypersync.xyz".parse()?),
/// ..Default::default()
/// })?);
///
/// let mut rx = client.stream_height().await?;
///
/// while let Some(result) = rx.recv().await {
/// match result {
/// Ok(height) => println!("Height: {}", height),
/// Err(e) => eprintln!("Error: {}", e),
/// }
/// }
/// # Ok(())
/// # }
/// ```
pub async fn stream_height(self: Arc<Self>) -> Result<mpsc::Receiver<Result<u64>>> {
// Create a channel for sending height updates from the background task to the caller.
// Buffer size of 16 allows for some burst handling without blocking the sender.
let (tx, rx) = mpsc::channel(16);
let client = self.clone();

// Spawn a background task that manages the SSE connection lifecycle.
// This task runs indefinitely, handling reconnections automatically.
tokio::spawn(async move {
// Reconnection delay starts at 1 second and doubles on each failure (exponential backoff).
// This prevents hammering the server when it's down or restarting.
let mut reconnect_delay = INITIAL_RECONNECT_DELAY;

// Main reconnection loop - runs forever, attempting to maintain a connection.
loop {
// === STEP 1: Build the SSE endpoint URL ===
// Construct the full URL path: <base_url>/height/sse
let mut url = client.url.clone();
let mut segments = url.path_segments_mut().ok().unwrap();
segments.push("height");
segments.push("sse");
std::mem::drop(segments); // Release the mutable borrow on url

// === STEP 2: Prepare the HTTP GET request ===
let mut req = client.http_client.request(Method::GET, url);

// Add bearer token authentication if configured
if let Some(bearer_token) = &client.bearer_token {
req = req.bearer_auth(bearer_token);
}

// Configure request headers and timeout.
// SSE connections are long-lived, so we use a 24-hour timeout to prevent
// the HTTP client from terminating the connection prematurely.
req = req
.header(header::ACCEPT, "text/event-stream")
.timeout(MAX_CONNECTION_AGE);

// === STEP 3: Attempt to establish the SSE connection ===
match req.send().await {
Ok(res) => {
let status = res.status();

// Check for HTTP errors (non-2xx status codes)
if !status.is_success() {
log::warn!("❌ HTTP error: status code {}", status);

// Wait before retrying with exponential backoff
tokio::time::sleep(reconnect_delay).await;
reconnect_delay =
std::cmp::min(reconnect_delay * 2, MAX_RECONNECT_DELAY);
continue; // Retry the connection
}

// Successfully connected!
log::info!("✅ Connected to height SSE stream");

// Reset reconnection delay after successful connection
reconnect_delay = INITIAL_RECONNECT_DELAY;

// === STEP 4: Process the SSE byte stream ===
// Get the response body as a stream of bytes
let mut byte_stream = res.bytes_stream();

// Buffer for accumulating incomplete SSE messages.
// SSE messages are text-based and separated by blank lines (\n\n).
let mut buf = String::new();

// Flag to track if the connection is still active
let mut connection_active = true;

// Main message processing loop - runs until the connection drops
while connection_active {
match byte_stream.next().await {
// === Successfully received bytes from the stream ===
Some(Ok(bytes)) => {
log::trace!(
"📦 Received {} bytes from SSE stream",
bytes.len()
);

use std::fmt::Write as _;

// Convert bytes to UTF-8 string
let chunk_str = match std::str::from_utf8(&bytes) {
Ok(s) => s,
Err(_) => {
// Handle invalid UTF-8 by doing lossy conversion.
// This is rare but can happen with network corruption.
let mut tmp = String::with_capacity(bytes.len() * 2);
for &b in bytes.as_ref() {
let _ = write!(&mut tmp, "{}", char::from(b));
}
buf.push_str(&tmp);
continue;
}
};

// Append the new chunk to our buffer
buf.push_str(chunk_str);

// === STEP 5: Parse complete SSE messages ===
// SSE messages are separated by blank lines (\n\n).
// Process all complete messages currently in the buffer.
loop {
if let Some(idx) = buf.find("\n\n") {
// Extract one complete SSE message
let event_block = buf[..idx].to_string();
buf.drain(..idx + 2); // Remove message + blank line from buffer

// Parse the SSE event fields according to the W3C spec
let mut event_name: Option<&str> = None;
let mut data_lines: Vec<&str> = Vec::new();

// Process each line in the event block
for line in event_block.lines() {
if line.is_empty() {
continue;
}
if line.starts_with(':') {
// Comment line (used for keep-alive pings).
// Format: ": ping" or ": <comment text>"
continue;
}
if let Some(rest) = line.strip_prefix("event:") {
// Event type field.
// Format: "event: height"
event_name = Some(rest.trim());
continue;
}
if let Some(rest) = line.strip_prefix("data:") {
// Data field (can be multiple per event).
// Format: "data: 12345"
data_lines.push(rest.trim());
continue;
}
// Ignore other SSE fields like "id:" and "retry:"
}

// === STEP 6: Process height events ===
let name = event_name.unwrap_or("");
if name == "height" {
// Combine multiple data lines (though typically just one)
let data = data_lines.join("\n");

// Try parsing as plain integer (preferred format).
// Server sends: event:height\ndata:12345\n\n
if let Ok(h) = data.trim().parse::<u64>() {
log::debug!("📈 Height update: {}", h);

// Send the height through the channel.
// If the receiver is dropped, exit the task gracefully.
if tx.send(Ok(h)).await.is_err() {
log::info!(
"Receiver dropped, exiting stream task"
);
return;
}
} else {
log::warn!(
"❌ Failed to parse height: {}",
data
);
connection_active = false;
continue;
}
}
} else {
// No complete message in buffer yet, wait for more data
break;
}
}
}

// === Stream error occurred (network issue, timeout, etc.) ===
Some(Err(e)) => {
log::warn!("⚠️ SSE stream error: {:?}", e);
connection_active = false; // Exit loop and reconnect
}

// === Stream ended (server closed the connection) ===
// This happens during server restarts, shutdowns, or SIGTERM simulation
None => {
log::info!("🔌 SSE stream closed by server, will reconnect");
connection_active = false; // Exit loop and reconnect
}
}
}
}

// === Failed to establish HTTP connection ===
Err(e) => {
log::warn!("❌ Failed to connect to height stream: {:?}", e);
}
}

// === STEP 7: Wait before reconnecting ===
// After any disconnection (graceful or error), wait before attempting to reconnect.
// This implements exponential backoff to avoid overwhelming the server.
log::info!("⏳ Reconnecting in {:?}...", reconnect_delay);
tokio::time::sleep(reconnect_delay).await;

// Double the delay for the next attempt, up to the maximum.
// Pattern: 0.5s → 1s → 2s → 4s → 8s → 16s → 30s (max) → 30s → ...
reconnect_delay = std::cmp::min(reconnect_delay * 2, MAX_RECONNECT_DELAY);
}
});

Ok(rx)
}
}

fn check_simple_stream_params(config: &StreamConfig) -> Result<()> {
if config.event_signature.is_some() {
return Err(anyhow!(
Expand Down
Loading