Skip to content

Commit fd1d5cb

Browse files
committed
add ready check
1 parent 41ff461 commit fd1d5cb

File tree

4 files changed

+77
-14
lines changed

4 files changed

+77
-14
lines changed

crates/arkflow-core/src/engine/mod.rs

Lines changed: 55 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@
1313
*/
1414

1515
use crate::config::EngineConfig;
16+
use crate::input::Input;
17+
use crate::output::Output;
1618
use std::process;
1719
use std::sync::atomic::{AtomicBool, Ordering};
1820
use std::sync::Arc;
@@ -29,10 +31,20 @@ use axum::{routing::get, Router};
2931
use serde::Serialize;
3032
use tokio::net::TcpListener;
3133

34+
/// Holds references to stream components for health checking
35+
struct StreamComponents {
36+
input: Arc<dyn Input>,
37+
output: Arc<dyn Output>,
38+
error_output: Option<Arc<dyn Output>>,
39+
}
40+
41+
struct EngineApiState {
42+
health_state: Arc<HealthState>,
43+
components: Vec<StreamComponents>,
44+
}
45+
3246
/// Health check status
3347
struct HealthState {
34-
/// Whether the engine has been initialized
35-
is_ready: AtomicBool,
3648
/// Whether the engine is currently running
3749
is_running: AtomicBool,
3850
}
@@ -82,7 +94,6 @@ impl Engine {
8294
Self {
8395
config,
8496
health_state: Arc::new(HealthState {
85-
is_ready: AtomicBool::new(false),
8697
is_running: AtomicBool::new(false),
8798
}),
8899
}
@@ -99,6 +110,7 @@ impl Engine {
99110
async fn start_health_check_server(
100111
&self,
101112
cancellation_token: CancellationToken,
113+
components: Vec<StreamComponents>,
102114
) -> Result<(), Box<dyn std::error::Error>> {
103115
let health_check = &self.config.health_check;
104116

@@ -113,7 +125,10 @@ impl Engine {
113125
.route(&*health_check.health_path, get(Self::handle_health))
114126
.route(&*health_check.readiness_path, get(Self::handle_readiness))
115127
.route(&*health_check.liveness_path, get(Self::handle_liveness))
116-
.with_state(health_state);
128+
.with_state(Arc::new(EngineApiState {
129+
health_state,
130+
components,
131+
}));
117132

118133
let addr = &health_check.address;
119134
let addr = addr.clone();
@@ -149,8 +164,8 @@ impl Engine {
149164
///
150165
/// # Arguments
151166
/// * `state` - The shared health state containing running status
152-
async fn handle_health(State(state): State<Arc<HealthState>>) -> impl IntoResponse {
153-
let is_running = state.is_running.load(Ordering::SeqCst);
167+
async fn handle_health(State(state): State<Arc<EngineApiState>>) -> impl IntoResponse {
168+
let is_running = state.health_state.is_running.load(Ordering::SeqCst);
154169
let status = if is_running { "healthy" } else { "unhealthy" };
155170

156171
let response = HealthResponse {
@@ -174,8 +189,26 @@ impl Engine {
174189
///
175190
/// # Arguments
176191
/// * `state` - The shared health state containing readiness status
177-
async fn handle_readiness(State(state): State<Arc<HealthState>>) -> impl IntoResponse {
178-
let is_ready = state.is_ready.load(Ordering::SeqCst);
192+
async fn handle_readiness(State(state): State<Arc<EngineApiState>>) -> impl IntoResponse {
193+
let mut is_ready = true;
194+
195+
for component in state.components.iter() {
196+
if !component.input.check_ready().await.is_ok() {
197+
is_ready = false;
198+
}
199+
if !component.output.check_ready().await.is_ok() {
200+
is_ready = false;
201+
}
202+
if let Some(error_output) = component.error_output.as_ref() {
203+
if !error_output.check_ready().await.is_ok() {
204+
is_ready = false;
205+
}
206+
}
207+
if !is_ready {
208+
break;
209+
}
210+
}
211+
179212
let status = if is_ready { "ready" } else { "not ready" };
180213

181214
let response = ReadinessResponse {
@@ -198,7 +231,7 @@ impl Engine {
198231
///
199232
/// # Arguments
200233
/// * `_` - Unused health state parameter
201-
async fn handle_liveness(_: State<Arc<HealthState>>) -> impl IntoResponse {
234+
async fn handle_liveness(_: State<Arc<EngineApiState>>) -> impl IntoResponse {
202235
// As long as the server can respond, it is considered alive
203236
let response = LivenessResponse {
204237
status: "alive".to_string(),
@@ -220,9 +253,6 @@ impl Engine {
220253
pub async fn run(&self) -> Result<(), Box<dyn std::error::Error>> {
221254
let token = CancellationToken::new();
222255

223-
// Start the health check server
224-
self.start_health_check_server(token.clone()).await?;
225-
226256
// Create and run all flows
227257
let mut streams = Vec::new();
228258
let mut handles = Vec::new();
@@ -241,8 +271,19 @@ impl Engine {
241271
}
242272
}
243273

244-
// Set the readiness status
245-
self.health_state.is_ready.store(true, Ordering::SeqCst);
274+
let components = streams
275+
.iter()
276+
.map(|stream| StreamComponents {
277+
input: stream.get_input(),
278+
output: stream.get_output(),
279+
error_output: stream.get_error_output(),
280+
})
281+
.collect();
282+
283+
// Start the health check server
284+
self.start_health_check_server(token.clone(), components)
285+
.await?;
286+
246287
// Set up signal handlers
247288
let mut sigint = signal(SignalKind::interrupt()).expect("Failed to set signal handler");
248289
let mut sigterm = signal(SignalKind::terminate()).expect("Failed to set signal handler");

crates/arkflow-core/src/input/mod.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,11 @@ pub trait Input: Send + Sync {
5151

5252
/// Close the input source connection
5353
async fn close(&self) -> Result<(), Error>;
54+
55+
/// Check if the input source is ready and healthy
56+
async fn check_ready(&self) -> Result<(), Error> {
57+
Ok(())
58+
}
5459
}
5560

5661
pub struct NoopAck;

crates/arkflow-core/src/output/mod.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,11 @@ pub trait Output: Send + Sync {
3737

3838
/// Close the output destination connection
3939
async fn close(&self) -> Result<(), Error>;
40+
41+
/// Check if the output destination is ready and healthy
42+
async fn check_ready(&self) -> Result<(), Error> {
43+
Ok(())
44+
}
4045
}
4146

4247
/// Output configuration

crates/arkflow-core/src/stream/mod.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -425,6 +425,18 @@ impl Stream {
425425

426426
Ok(())
427427
}
428+
429+
pub fn get_input(&self) -> Arc<dyn Input> {
430+
self.input.clone()
431+
}
432+
433+
pub fn get_output(&self) -> Arc<dyn Output> {
434+
self.output.clone()
435+
}
436+
437+
pub fn get_error_output(&self) -> Option<Arc<dyn Output>> {
438+
self.error_output.clone()
439+
}
428440
}
429441

430442
/// Stream configuration

0 commit comments

Comments
 (0)