Skip to content

UI shows when we are waiting / throttling to avoid rate-limit #38

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions crates/code_assistant/src/session/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,12 @@ impl SessionInstance {
fn should_streaming_continue(&self) -> bool {
true
}
fn notify_rate_limit(&self, _seconds_remaining: u64) {
// No-op for dummy UI
}
fn clear_rate_limit(&self) {
// No-op for dummy UI
}
}

let dummy_ui = std::sync::Arc::new(Box::new(DummyUI) as Box<dyn crate::ui::UserInterface>);
Expand Down Expand Up @@ -416,4 +422,18 @@ impl UserInterface for ProxyUI {
true // Don't interrupt streaming if session is not connected
}
}

fn notify_rate_limit(&self, seconds_remaining: u64) {
if self.is_connected() {
self.real_ui.notify_rate_limit(seconds_remaining);
}
// No-op if session not connected
}

fn clear_rate_limit(&self) {
if self.is_connected() {
self.real_ui.clear_rate_limit();
}
// No-op if session not connected
}
}
8 changes: 8 additions & 0 deletions crates/code_assistant/src/tests/mocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,14 @@ impl UserInterface for MockUI {
// Mock implementation always continues streaming
true
}

fn notify_rate_limit(&self, _seconds_remaining: u64) {
// Mock implementation does nothing with rate limit notifications
}

fn clear_rate_limit(&self) {
// Mock implementation does nothing with rate limit clearing
}
}

// Mock Explorer
Expand Down
13 changes: 13 additions & 0 deletions crates/code_assistant/src/ui/gpui/elements.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ pub struct MessageContainer {
/// Set to `true` in UiEvent::StreamingStarted, and set to false when the first streaming chunk
/// is received. A progress spinner is showing while it is `true`.
waiting_for_content: Arc<Mutex<bool>>,
/// Rate limit countdown in seconds (None = no rate limiting)
rate_limit_countdown: Arc<Mutex<Option<u64>>>,
}

impl MessageContainer {
Expand All @@ -41,6 +43,7 @@ impl MessageContainer {
role,
current_request_id: Arc::new(Mutex::new(0)),
waiting_for_content: Arc::new(Mutex::new(false)),
rate_limit_countdown: Arc::new(Mutex::new(None)),
}
}

Expand All @@ -59,6 +62,16 @@ impl MessageContainer {
*self.waiting_for_content.lock().unwrap()
}

// Set rate limit countdown
pub fn set_rate_limit_countdown(&self, seconds: Option<u64>) {
*self.rate_limit_countdown.lock().unwrap() = seconds;
}

// Get rate limit countdown
pub fn get_rate_limit_countdown(&self) -> Option<u64> {
*self.rate_limit_countdown.lock().unwrap()
}

// Remove all blocks with the given request ID
// Used when the user cancels a request while it is streaming
pub fn remove_blocks_with_request_id(&self, request_id: u64, cx: &mut Context<Self>) {
Expand Down
17 changes: 14 additions & 3 deletions crates/code_assistant/src/ui/gpui/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,17 @@ impl Render for MessagesView {

// Add loading indicator if waiting for content
if msg.read(cx).is_waiting_for_content() {
let rate_limit_countdown = msg.read(cx).get_rate_limit_countdown();

let (message_text, icon_color) = if let Some(seconds) = rate_limit_countdown {
(
format!("Rate limited - retrying in {}s...", seconds),
cx.theme().warning,
)
} else {
("Waiting for response...".to_string(), cx.theme().info)
};

container_children.push(
div()
.flex()
Expand All @@ -113,7 +124,7 @@ impl Render for MessagesView {
svg()
.size(px(16.))
.path(SharedString::from("icons/arrow_circle.svg"))
.text_color(cx.theme().info)
.text_color(icon_color)
.with_animation(
"loading_indicator",
Animation::new(std::time::Duration::from_secs(2))
Expand All @@ -128,9 +139,9 @@ impl Render for MessagesView {
)
.child(
div()
.text_color(cx.theme().info)
.text_color(icon_color)
.text_size(px(12.))
.child("Waiting for response..."),
.child(message_text),
)
.into_any_element(),
);
Expand Down
33 changes: 33 additions & 0 deletions crates/code_assistant/src/ui/gpui/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -617,6 +617,31 @@ impl Gpui {
warn!("UI: No backend event sender available");
}
}
UiEvent::RateLimitNotification { seconds_remaining } => {
debug!(
"UI: RateLimitNotification event: {} seconds remaining",
seconds_remaining
);
let queue = self.message_queue.lock().unwrap();
if let Some(last) = queue.last() {
cx.update_entity(&last, |message, cx| {
message.set_rate_limit_countdown(Some(seconds_remaining));
cx.notify();
})
.expect("Failed to update entity");
}
}
UiEvent::ClearRateLimit => {
debug!("UI: ClearRateLimit event");
let queue = self.message_queue.lock().unwrap();
if let Some(last) = queue.last() {
cx.update_entity(&last, |message, cx| {
message.set_rate_limit_countdown(None);
cx.notify();
})
.expect("Failed to update entity");
}
}
}
}

Expand Down Expand Up @@ -907,4 +932,12 @@ impl UserInterface for Gpui {
_ => true,
}
}

fn notify_rate_limit(&self, seconds_remaining: u64) {
self.push_event(UiEvent::RateLimitNotification { seconds_remaining });
}

fn clear_rate_limit(&self) {
self.push_event(UiEvent::ClearRateLimit);
}
}
4 changes: 4 additions & 0 deletions crates/code_assistant/src/ui/gpui/ui_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,4 +72,8 @@ pub enum UiEvent {
ClearMessages,
/// Send user message to active session (triggers agent)
SendUserMessage { message: String, session_id: String },
/// Notify about rate limiting with countdown
RateLimitNotification { seconds_remaining: u64 },
/// Clear rate limit notification
ClearRateLimit,
}
6 changes: 6 additions & 0 deletions crates/code_assistant/src/ui/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,12 @@ pub trait UserInterface: Send + Sync {

/// Check if streaming should continue
fn should_streaming_continue(&self) -> bool;

/// Notify the UI about rate limiting and countdown
fn notify_rate_limit(&self, seconds_remaining: u64);

/// Clear rate limit notification
fn clear_rate_limit(&self);
}

#[cfg(test)]
Expand Down
12 changes: 12 additions & 0 deletions crates/code_assistant/src/ui/streaming/json_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,18 @@ impl StreamProcessorTrait for JsonStreamProcessor {
.ui
.display_fragment(&DisplayFragment::ThinkingText(text.clone())),

StreamingChunk::RateLimit { seconds_remaining } => {
// Notify UI about rate limit with countdown
self.ui.notify_rate_limit(*seconds_remaining);
Ok(())
}

StreamingChunk::RateLimitClear => {
// Clear rate limit notification
self.ui.clear_rate_limit();
Ok(())
}

StreamingChunk::InputJson {
content,
tool_name,
Expand Down
8 changes: 8 additions & 0 deletions crates/code_assistant/src/ui/streaming/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,14 @@ impl UserInterface for TestUI {
// Test implementation always continues streaming
true
}

fn notify_rate_limit(&self, _seconds_remaining: u64) {
// Test implementation does nothing with rate limit notifications
}

fn clear_rate_limit(&self) {
// Test implementation does nothing with rate limit clearing
}
}

/// Helper function to split text into small chunks for testing tag handling
Expand Down
12 changes: 12 additions & 0 deletions crates/code_assistant/src/ui/streaming/xml_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,18 @@ impl StreamProcessorTrait for XmlStreamProcessor {
.ui
.display_fragment(&DisplayFragment::ThinkingText(text.clone())),

StreamingChunk::RateLimit { seconds_remaining } => {
// Notify UI about rate limit with countdown
self.ui.notify_rate_limit(*seconds_remaining);
Ok(())
}

StreamingChunk::RateLimitClear => {
// Clear rate limit notification
self.ui.clear_rate_limit();
Ok(())
}

// For native JSON input, handle based on tool information
StreamingChunk::InputJson {
content,
Expand Down
10 changes: 10 additions & 0 deletions crates/code_assistant/src/ui/terminal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,4 +233,14 @@ impl UserInterface for TerminalUI {
// Terminal UI always continues streaming (no stop functionality)
true
}

fn notify_rate_limit(&self, seconds_remaining: u64) {
// For terminal UI, we could print immediately, but let's keep it simple
// In a real implementation, this might use a channel to communicate with the terminal
println!("Rate limited - retrying in {}s...", seconds_remaining);
}

fn clear_rate_limit(&self) {
// No action needed for terminal UI
}
}
1 change: 1 addition & 0 deletions crates/llm/src/aicore_converse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,7 @@ impl AiCoreClient {
&e,
attempts,
max_retries,
streaming_callback,
)
.await
{
Expand Down
1 change: 1 addition & 0 deletions crates/llm/src/aicore_invoke.rs
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,7 @@ impl AiCoreClient {
&e,
attempts,
max_retries,
streaming_callback,
)
.await
{
Expand Down
1 change: 1 addition & 0 deletions crates/llm/src/anthropic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -605,6 +605,7 @@ impl AnthropicClient {
&e,
attempts,
max_retries,
streaming_callback,
)
.await
{
Expand Down
4 changes: 4 additions & 0 deletions crates/llm/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ pub enum StreamingChunk {
tool_name: Option<String>,
tool_id: Option<String>,
},
/// Rate limit notification with countdown in seconds
RateLimit { seconds_remaining: u64 },
/// Clear rate limit notification
RateLimitClear,
}

pub type StreamingCallback = Box<dyn Fn(&StreamingChunk) -> Result<()> + Send + Sync>;
Expand Down
1 change: 1 addition & 0 deletions crates/llm/src/openai.rs
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,7 @@ impl OpenAIClient {
&e,
attempts,
max_retries,
streaming_callback,
)
.await
{
Expand Down
48 changes: 43 additions & 5 deletions crates/llm/src/utils.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::{ApiError, ApiErrorContext, RateLimitHandler};
use crate::{ApiError, ApiErrorContext, RateLimitHandler, StreamingCallback, StreamingChunk};
use anyhow::Result;
use reqwest::{Response, StatusCode};
use std::time::Duration;
Expand Down Expand Up @@ -39,26 +39,64 @@ pub async fn check_response_error<T: RateLimitHandler + std::fmt::Debug + Send +
/// Handle retryable errors and rate limiting for LLM providers.
/// Returns true if the error is retryable and we should continue the retry loop.
/// Returns false if we should exit the retry loop.
///
/// If a streaming_callback is provided, rate limit notifications will be sent to the UI.
pub async fn handle_retryable_error<
T: RateLimitHandler + std::fmt::Debug + Send + Sync + 'static,
>(
error: &anyhow::Error,
attempts: u32,
max_retries: u32,
streaming_callback: Option<&StreamingCallback>,
) -> bool {
if let Some(ctx) = error.downcast_ref::<ApiErrorContext<T>>() {
match &ctx.error {
ApiError::RateLimit(_) => {
if let Some(rate_limits) = &ctx.rate_limits {
if attempts < max_retries {
let delay = rate_limits.get_retry_delay();
let delay_secs = delay.as_secs();
warn!(
"Rate limit hit (attempt {}/{}), waiting {} seconds before retry",
attempts,
max_retries,
delay.as_secs()
attempts, max_retries, delay_secs
);
sleep(delay).await;

// Send rate limit notification if callback is available
if let Some(callback) = streaming_callback {
let _ = callback(&StreamingChunk::RateLimit {
seconds_remaining: delay_secs,
});

// Start a countdown with precise timing
let start_time = std::time::Instant::now();
let mut next_update = start_time + Duration::from_secs(1);

while start_time.elapsed() < delay {
// Sleep until the next update time, accounting for callback execution time
let now = std::time::Instant::now();
if now < next_update {
sleep(next_update - now).await;
}

// Calculate remaining time based on actual elapsed time
let elapsed = start_time.elapsed();
let remaining_secs = delay_secs.saturating_sub(elapsed.as_secs());

let _ = callback(&StreamingChunk::RateLimit {
seconds_remaining: remaining_secs,
});

// Schedule next update
next_update += Duration::from_secs(1);
}

// Clear the rate limit notification
let _ = callback(&StreamingChunk::RateLimitClear);
} else {
// No callback, just wait
sleep(delay).await;
}

return true;
}
} else {
Expand Down
1 change: 1 addition & 0 deletions crates/llm/src/vertex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,7 @@ impl VertexClient {
&e,
attempts,
max_retries,
streaming_callback,
)
.await
{
Expand Down
Loading