Skip to content

perf: cache ws connections #768

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 4, 2025
Merged

perf: cache ws connections #768

merged 1 commit into from
Jun 4, 2025

Conversation

junhyr
Copy link
Collaborator

@junhyr junhyr commented Jun 4, 2025

PR Type

Enhancement


Description

  • Introduce global WS connection cache

  • Centralize WebSocket reconnection logic

  • Store prompts and submission state in cache

  • Refactor hook to use subscribers


Changes walkthrough 📝

Relevant files
Enhancement
usePromptQueue.ts
Cache WebSocket connections in hook                                           

apps/app/app/hooks/usePromptQueue.ts

  • Added CacheEntry interface and promptQueueCache map
  • Introduced getCacheEntry, notifySubscribers helpers
  • Abstracted WebSocket connect/reconnect into functions
  • Updated usePromptQueue to use cached state and subscribers
  • +222/-121

    Need help?
  • Type /help how to ... in the comments thread for any questions about PR-Agent usage.
  • Check out the documentation for more information.
  • Copy link

    vercel bot commented Jun 4, 2025

    The latest updates on your projects. Learn more about Vercel for Git ↗︎

    Name Status Preview Comments Updated (UTC)
    pipelines-app 🔄 Building (Inspect) Visit Preview 💬 Add feedback Jun 4, 2025 11:15am

    @junhyr junhyr temporarily deployed to Preview - e2e June 4, 2025 11:15 — with GitHub Actions Inactive
    @junhyr junhyr changed the title feat: cache ws connections perf: cache ws connections Jun 4, 2025
    @junhyr junhyr merged commit d33193c into main Jun 4, 2025
    5 of 6 checks passed
    @junhyr junhyr deleted the jun/queue-fix branch June 4, 2025 11:15
    Copy link

    github-actions bot commented Jun 4, 2025

    PR Reviewer Guide 🔍

    Here are some key observations to aid the review process:

    ⏱️ Estimated effort to review: 4 🔵🔵🔵🔵⚪
    🧪 No relevant tests
    🔒 No security concerns identified
    ⚡ Recommended focus areas for review

    Reconnection Cleanup

    Ensure reconnectTimer is cleared and WebSocket is properly closed when subscribers drop to zero to avoid lingering timers or sockets.

      if (entry.reconnectTimer) {
        clearTimeout(entry.reconnectTimer);
        entry.reconnectTimer = null;
      }
    
      const wsUrl = process.env.NEXT_PUBLIC_API_WS_URL || "ws://localhost:8080";
      const ws = new WebSocket(
        `${wsUrl}/ws?streamKey=${encodeURIComponent(streamKey)}`,
      );
    
      ws.onopen = () => {
        const currentEntry = promptQueueCache.get(streamKey);
        if (currentEntry && currentEntry.subscribers.size > 0) {
          currentEntry.ws = ws;
        } else {
          ws.close();
        }
      };
    
      ws.onmessage = event => {
        const currentEntry = promptQueueCache.get(streamKey);
        if (!currentEntry || currentEntry.ws !== ws) {
          return;
        }
    
        try {
          const message = JSON.parse(event.data);
          console.log("onmessage", message.type);
    
          switch (message.type) {
            case "initial":
              currentEntry.currentPrompt = message.payload.currentPrompt;
              currentEntry.recentPrompts = message.payload.recentPrompts || [];
              notifySubscribers(streamKey);
              break;
    
            case "CurrentPrompt":
              currentEntry.currentPrompt = message.payload.prompt;
              notifySubscribers(streamKey);
              break;
    
            case "RecentPromptsUpdate":
              currentEntry.recentPrompts = message.payload.recent_prompts || [];
              notifySubscribers(streamKey);
              break;
    
            default:
          }
        } catch (error) {
          console.error("Error parsing WebSocket message:", error);
        }
      };
    
      ws.onclose = event => {
        const currentEntry = promptQueueCache.get(streamKey);
        if (!currentEntry || currentEntry.ws !== ws) {
          return;
        }
    
        currentEntry.ws = null;
    
        if (
          event.code !== 1000 &&
          event.code !== 1001 &&
          currentEntry.subscribers.size > 0
        ) {
          console.log("Attempting to reconnect in 3 seconds...");
          currentEntry.reconnectTimer = setTimeout(() => {
            const stillCurrentEntry = promptQueueCache.get(streamKey);
            if (stillCurrentEntry && stillCurrentEntry.subscribers.size > 0) {
              connectWebSocket(streamKey);
            }
          }, 3000);
        }
      };
    
      ws.onerror = error => {
        console.error("WebSocket error for streamKey:", streamKey, error);
      };
    }
    
    function cleanupCacheEntry(streamKey: string) {
      const entry = promptQueueCache.get(streamKey);
    State Race

    The hooks for submitPrompt and addRandomPrompt capture isSubmitting from closure; verify that it stays up-to-date to prevent duplicate submissions or stale state.

    const submitPrompt = useCallback(
      async (text: string) => {
        if (!text.trim() || !streamKey || isSubmitting) return false;
    
        const entry = getCacheEntry(streamKey);
        entry.isSubmitting = true;
        notifySubscribers(streamKey);
    
        try {
          const apiUrl =
            process.env.NEXT_PUBLIC_API_URL || "http://localhost:8080";
          const response = await fetch(`${apiUrl}/prompts`, {
            method: "POST",
            headers: { "Content-Type": "application/json" },
            body: JSON.stringify({
              text: text.trim(),
              streamKey,
            }),
          });
    
          if (!response.ok) {
            throw new Error(`HTTP error! status: ${response.status}`);
          }
    
          const result = await response.json();
    
          track("daydream_landing_page_prompt_submitted", {
            is_authenticated: authenticated,
            prompt: text,
            nsfw: result?.wasCensored || false,
            stream_key: streamKey,
          });
    
          return true;
        } catch (error) {
          return false;
        } finally {
          const currentEntry = promptQueueCache.get(streamKey);
          if (currentEntry) {
            currentEntry.isSubmitting = false;
            notifySubscribers(streamKey);
          }
        }
      },
      [streamKey, isSubmitting, authenticated],
    );
    
    const addRandomPrompt = useCallback(async () => {
      if (!streamKey || isSubmitting) return false;
    
      const entry = getCacheEntry(streamKey);
      entry.isSubmitting = true;
      notifySubscribers(streamKey);
    
      try {
        const apiUrl = process.env.NEXT_PUBLIC_API_URL || "http://localhost:8080";
        const response = await fetch(
          `${apiUrl}/prompts?streamKey=${encodeURIComponent(streamKey)}`,
          {
            method: "PUT",
          },
        );
    
        if (!response.ok) {
          throw new Error(`HTTP error! status: ${response.status}`);
        }
    
        return true;
      } catch (error) {
        return false;
      } finally {
        const currentEntry = promptQueueCache.get(streamKey);
        if (currentEntry) {
          currentEntry.isSubmitting = false;
          notifySubscribers(streamKey);
        }
      }
    }, [streamKey, isSubmitting]);
    API Exposure

    The returned wsRef exposes the raw WebSocket; consider encapsulating or documenting its usage to avoid misuse.

      wsRef: { current: entry?.ws || null },
      getHighlightedIndex,
      submitPrompt,
      addRandomPrompt,
    };

    Copy link

    github-actions bot commented Jun 4, 2025

    PR Code Suggestions ✨

    Explore these optional code suggestions:

    CategorySuggestion                                                                                                                                    Impact
    General
    Initialize ws immediately

    To prevent race conditions and duplicate connection attempts, assign entry.ws
    immediately after creating the socket. This ensures concurrent calls see an
    in‐progress connection.

    apps/app/app/hooks/usePromptQueue.ts [57-70]

     if (entry.ws) {
       entry.ws.close();
       entry.ws = null;
     }
    -...
     const ws = new WebSocket(
       `${wsUrl}/ws?streamKey=${encodeURIComponent(streamKey)}`,
     );
    +entry.ws = ws;
    Suggestion importance[1-10]: 7

    __

    Why: Assigning entry.ws = ws right after socket creation reduces race conditions by marking the connection in-progress for concurrent calls.

    Medium
    Use correct browser timer type

    The NodeJS.Timeout type is not the correct return type for browser setTimeout calls.
    Use ReturnType (or number) so it aligns with the DOM timer return
    value.

    apps/app/app/hooks/usePromptQueue.ts [28]

    -reconnectTimer: NodeJS.Timeout | null;
    +reconnectTimer: ReturnType<typeof setTimeout> | null;
    Suggestion importance[1-10]: 6

    __

    Why: Using NodeJS.Timeout mixes Node and browser types; ReturnType<typeof setTimeout> ensures the correct DOM timer return type.

    Low
    Possible issue
    Reference submission flag from cache

    Avoid relying on the closed-over isSubmitting value which can become stale. Read the
    submission flag directly from the cached entry so you always check the current
    state.

    apps/app/app/hooks/usePromptQueue.ts [220]

    -if (!text.trim() || !streamKey || isSubmitting) return false;
    +const entry = getCacheEntry(streamKey);
    +if (!text.trim() || !streamKey || entry.isSubmitting) return false;
    Suggestion importance[1-10]: 7

    __

    Why: Reading entry.isSubmitting from the cache prevents stale closure values and ensures the current submission state is checked.

    Medium

    Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
    Projects
    None yet
    Development

    Successfully merging this pull request may close these issues.

    1 participant