Skip to content

feat: remove random prompts #765

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jun 4, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 2 additions & 61 deletions apps/api/src/services/prompt_manager.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::models::{Prompt, WsMessage};
use crate::models::WsMessage;
use crate::services::stream_api;
use crate::state::AppState;
use anyhow::Result;
Expand All @@ -8,7 +8,7 @@ use std::sync::Arc;
use tokio::time;
use tracing::{error, info};

const RANDOM_PROMPTS: &[&str] = &[
const _RANDOM_PROMPTS: &[&str] = &[
"hyperrealistic portrait of an alien queen --quality 3",
"fantasy castle floating among clouds at sunset --creativity 0.8",
"cybernetic ((animal)) with glowing parts --quality 2",
Expand Down Expand Up @@ -39,7 +39,6 @@ pub async fn run(state: Arc<AppState>) -> Result<()> {

let mut interval = time::interval(time::Duration::from_secs(1));
let failure_tracker: Arc<DashMap<String, DateTime<Utc>>> = Arc::new(DashMap::new());
let last_prompt_activity: Arc<DashMap<String, DateTime<Utc>>> = Arc::new(DashMap::new());

loop {
interval.tick().await;
Expand All @@ -55,23 +54,6 @@ pub async fn run(state: Arc<AppState>) -> Result<()> {
}
}

match check_and_add_random_prompt_if_needed(&state, stream_key, &last_prompt_activity)
.await
{
Ok(added) => {
if added {
info!("Random prompt added for inactive stream: {}", stream_key);
last_prompt_activity.insert(stream_key.to_string(), Utc::now());
}
}
Err(e) => {
error!(
"Error adding random prompt for stream {}: {}",
stream_key, e
);
}
}

match check_and_update_prompt(&state, stream_key).await {
Ok(updated) => {
if updated {
Expand All @@ -88,47 +70,6 @@ pub async fn run(state: Arc<AppState>) -> Result<()> {
}
}

async fn check_and_add_random_prompt_if_needed(
state: &Arc<AppState>,
stream_key: &str,
last_activity_tracker: &Arc<DashMap<String, DateTime<Utc>>>,
) -> Result<bool> {
let queue_length = state.redis.get_queue_length(stream_key).await?;

if queue_length > 0 {
last_activity_tracker.insert(stream_key.to_string(), Utc::now());
return Ok(false);
}

let now = Utc::now();
let should_add_random = match last_activity_tracker.get(stream_key) {
Some(last_activity_entry) => {
let last_activity = *last_activity_entry;
let idle_duration = now - last_activity;
idle_duration >= Duration::seconds(20)
}
None => {
last_activity_tracker.insert(stream_key.to_string(), now);
false
}
};

if should_add_random {
let random_index = (now.timestamp_millis() as usize) % RANDOM_PROMPTS.len();
let random_prompt_text = RANDOM_PROMPTS[random_index];

let prompt = Prompt::new(random_prompt_text.to_string(), stream_key.to_string());
state.redis.add_prompt_to_queue(prompt).await?;
info!(
"Added random prompt to stream {}: {}",
stream_key, random_prompt_text
);
return Ok(true);
}

Ok(false)
}

async fn check_and_update_prompt(state: &Arc<AppState>, stream_key: &str) -> Result<bool> {
let current_prompt = state.redis.get_current_prompt(stream_key).await?;

Expand Down
23 changes: 0 additions & 23 deletions apps/app/app/hooks/usePromptQueue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,6 @@ export function usePromptQueue(streamKey: string | undefined) {

useEffect(() => {
if (currentStreamKeyRef.current !== streamKey) {
console.log(
"StreamKey changed, resetting state:",
currentStreamKeyRef.current,
"->",
streamKey,
);
setCurrentPrompt(null);
setRecentPrompts([]);
setIsSubmitting(false);
Expand Down Expand Up @@ -63,28 +57,20 @@ export function usePromptQueue(streamKey: string | undefined) {
);

ws.onopen = () => {
console.log("WebSocket connected for streamKey:", streamKey);
if (currentStreamKeyRef.current === streamKey) {
wsRef.current = ws;
} else {
console.log("StreamKey changed during connection, closing WebSocket");
ws.close();
}
};

ws.onmessage = event => {
if (wsRef.current !== ws || currentStreamKeyRef.current !== streamKey) {
console.log("Ignoring message from outdated WebSocket connection");
return;
}

try {
const message = JSON.parse(event.data);
console.log(
"WebSocket message received for streamKey:",
streamKey,
message,
);

switch (message.type) {
case "initial":
Expand All @@ -101,21 +87,13 @@ export function usePromptQueue(streamKey: string | undefined) {
break;

default:
console.log("Unknown message type:", message.type);
}
} catch (error) {
console.error("Error parsing WebSocket message:", error);
}
};

ws.onclose = event => {
console.log(
"WebSocket disconnected for streamKey:",
streamKey,
"Code:",
event.code,
);

if (wsRef.current !== ws || currentStreamKeyRef.current !== streamKey) {
return;
}
Expand All @@ -142,7 +120,6 @@ export function usePromptQueue(streamKey: string | undefined) {
connectWebsocket();

return () => {
console.log("Cleaning up WebSocket for streamKey:", streamKey);
if (wsRef.current) {
wsRef.current.close();
wsRef.current = null;
Expand Down
Loading