Skip to content
Open
4 changes: 2 additions & 2 deletions cli/planoai/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ def main(ctx, version):

@click.command()
def build():
"""Build Arch from source. Works from any directory within the repo."""
"""Build Plano from source. Works from any directory within the repo."""

# Find the repo root
repo_root = find_repo_root()
Expand Down Expand Up @@ -112,7 +112,7 @@ def build():
],
check=True,
)
click.echo("archgw image built successfully.")
click.echo("plano image built successfully.")
except subprocess.CalledProcessError as e:
click.echo(f"Error building plano image: {e}")
sys.exit(1)
Expand Down
1 change: 1 addition & 0 deletions crates/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/brightstaff/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ reqwest = { version = "0.12.15", features = ["stream"] }
serde = { version = "1.0.219", features = ["derive"] }
serde_json = "1.0.140"
serde_with = "3.13.0"
strsim = "0.11"
serde_yaml = "0.9.34"
thiserror = "2.0.12"
tokio = { version = "1.44.2", features = ["full"] }
Expand Down
10 changes: 9 additions & 1 deletion crates/brightstaff/src/handlers/llm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,9 @@ pub async fn llm_chat(
.get_recent_user_message()
.map(|msg| truncate_message(&msg, 50));

// Extract messages for signal analysis (clone before moving client_request)
let messages_for_signals = client_request.get_messages();

client_request.set_model(resolved_model.clone());
if client_request.remove_metadata_key("archgw_preference_config") {
debug!(
Expand Down Expand Up @@ -287,13 +290,18 @@ pub async fn llm_chat(
.await;

// Create base processor for metrics and tracing
let base_processor = ObservableStreamProcessor::new(
let mut base_processor = ObservableStreamProcessor::new(
trace_collector,
operation_component::LLM,
llm_span,
request_start_time,
);

// Add messages for signal analysis if available
if !messages_for_signals.is_empty() {
base_processor = base_processor.with_messages(messages_for_signals);
}

// === v1/responses state management: Wrap with ResponsesStateProcessor ===
// Only wrap if we need to manage state (client is ResponsesAPI AND upstream is NOT ResponsesAPI AND state_storage is configured)
let streaming_response = if let (true, false, Some(state_store)) = (
Expand Down
102 changes: 100 additions & 2 deletions crates/brightstaff/src/handlers/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@ use tokio_stream::wrappers::ReceiverStream;
use tokio_stream::StreamExt;
use tracing::warn;

// Import tracing constants
use crate::tracing::{error, llm};
// Import tracing constants and signals
use crate::signals::{InteractionQuality, SignalAnalyzer, FLAG_MARKER};
use crate::tracing::{error, llm, signals as signal_constants};
use hermesllm::apis::openai::Message;

/// Trait for processing streaming chunks
/// Implementors can inject custom logic during streaming (e.g., hallucination detection, logging)
Expand All @@ -38,6 +40,7 @@ pub struct ObservableStreamProcessor {
chunk_count: usize,
start_time: Instant,
time_to_first_token: Option<u128>,
messages: Option<Vec<Message>>,
}

impl ObservableStreamProcessor {
Expand All @@ -62,8 +65,15 @@ impl ObservableStreamProcessor {
chunk_count: 0,
start_time,
time_to_first_token: None,
messages: None,
}
}

/// Set the conversation messages for signal analysis
pub fn with_messages(mut self, messages: Vec<Message>) -> Self {
self.messages = Some(messages);
self
}
}

impl StreamProcessor for ObservableStreamProcessor {
Expand Down Expand Up @@ -133,6 +143,94 @@ impl StreamProcessor for ObservableStreamProcessor {
}
}

// Analyze signals if messages are available and add to span attributes
if let Some(ref messages) = self.messages {
let analyzer = SignalAnalyzer::new();
let report = analyzer.analyze(messages);

// Add overall quality
self.span.attributes.push(Attribute {
key: signal_constants::QUALITY.to_string(),
value: AttributeValue {
string_value: Some(format!("{:?}", report.overall_quality)),
},
});

// Add repair/follow-up metrics if concerning
if report.follow_up.is_concerning || report.follow_up.repair_count > 0 {
self.span.attributes.push(Attribute {
key: signal_constants::REPAIR_COUNT.to_string(),
value: AttributeValue {
string_value: Some(report.follow_up.repair_count.to_string()),
},
});

self.span.attributes.push(Attribute {
key: signal_constants::REPAIR_RATIO.to_string(),
value: AttributeValue {
string_value: Some(format!("{:.3}", report.follow_up.repair_ratio)),
},
});
}

// Add flag marker to operation name if any concerning signal is detected
let should_flag = report.frustration.has_frustration
|| report.repetition.has_looping
|| report.escalation.escalation_requested
|| matches!(
report.overall_quality,
InteractionQuality::Poor | InteractionQuality::Severe
);

if should_flag {
// Prepend flag marker to the operation name
self.span.name = format!("{} {}", self.span.name, FLAG_MARKER);
}

// Add key signal metrics
if report.frustration.has_frustration {
self.span.attributes.push(Attribute {
key: signal_constants::FRUSTRATION_COUNT.to_string(),
value: AttributeValue {
string_value: Some(report.frustration.frustration_count.to_string()),
},
});
self.span.attributes.push(Attribute {
key: signal_constants::FRUSTRATION_SEVERITY.to_string(),
value: AttributeValue {
string_value: Some(report.frustration.severity.to_string()),
},
});
}

if report.repetition.has_looping {
self.span.attributes.push(Attribute {
key: signal_constants::REPETITION_COUNT.to_string(),
value: AttributeValue {
string_value: Some(report.repetition.repetition_count.to_string()),
},
});
}

if report.escalation.escalation_requested {
self.span.attributes.push(Attribute {
key: signal_constants::ESCALATION_REQUESTED.to_string(),
value: AttributeValue {
string_value: Some("true".to_string()),
},
});
}

if report.positive_feedback.has_positive_feedback {
self.span.attributes.push(Attribute {
key: signal_constants::POSITIVE_FEEDBACK_COUNT.to_string(),
value: AttributeValue {
string_value: Some(report.positive_feedback.positive_count.to_string()),
},
});
}
}

// Record the finalized span
self.collector
.record_span(&self.service_name, self.span.clone());
Expand Down
1 change: 1 addition & 0 deletions crates/brightstaff/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
pub mod handlers;
pub mod router;
pub mod signals;
pub mod state;
pub mod tracing;
pub mod utils;
Loading