Skip to content

Conversation

tanmayv25
Copy link
Contributor

@tanmayv25 tanmayv25 commented Aug 15, 2025

Overview:

The process piping in certain scenario was causing unexpected hangs. Using a non-blocking thread to read and write the data from the process.

Summary by CodeRabbit

  • Tests

    • Streamlined process management to a single subprocess with a dedicated output thread for non-blocking, line-based logging.
    • Cleans logs at start and prefixes output lines for clearer diagnostics.
    • Safer shutdown with timeouts and improved lifecycle handling for processes and threads.
    • Enhances reliability and readability of test output.
  • Refactor

    • Replaces multi-process output pipeline with a simpler, more efficient design.
    • Reduces resource usage and complexity in test utilities.

Copy link
Contributor

coderabbitai bot commented Aug 15, 2025

Walkthrough

Refactors ManagedProcess to replace a multi-process stdout tee/sed pipeline with a single subprocess and a dedicated output thread that reads, prefixes, logs, and optionally prints lines. Adds lifecycle changes for startup, termination, thread joining, and safe stream closure. Introduces _proc and _output_thread fields and updates termination logic.

Changes

Cohort / File(s) Summary
ManagedProcess refactor
tests/utils/managed_process.py
Replace tee/sed pipeline with single Popen and daemon output thread; non-blocking, line-based stdout handling; clear log at start; uppercase command prefixing; updated exit to terminate process tree, join output thread, and close streams; new attributes _proc, _output_thread; text-mode Popen with stdout=PIPE, stderr=STDOUT; improved termination and logging.

Sequence Diagram(s)

sequenceDiagram
    participant TR as Test Runner
    participant MP as ManagedProcess
    participant SP as Subprocess (Popen)
    participant OT as Output Thread
    participant LOG as Log File
    participant STD as Stdout (optional)

    TR->>MP: __enter__() / start()
    MP->>SP: Popen(cmd, text=True, stdout=PIPE, stderr=STDOUT)
    MP->>OT: start(process_output(SP.stdout))
    OT->>SP: Non-blocking read lines
    loop For each line
        OT->>LOG: write("[CMD] line")
        alt live print enabled
            OT->>STD: print(line)
        end
    end

    TR-->>MP: __exit__()
    MP->>SP: terminate process tree
    MP->>SP: wait()
    MP->>OT: join(timeout=5s)
    MP->>SP: close stdout safely
    MP->>MP: cleanup data dir
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~20 minutes

Poem

I nibble logs with tidy grace,
Threads hop light, in single place.
No more tee or sed to chase—
One burrow, one clear output trace.
Thump! The process ends its race;
I twitch my nose, commit embraced. 🐇✨

Tip

🔌 Remote MCP (Model Context Protocol) integration is now available!

Pro plan users can now connect to remote MCP servers from the Integrations page. Connect with popular remote MCPs such as Notion and Linear to add more context to your reviews and chats.


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

CodeRabbit Commands (Invoked using PR/Issue comments)

Type @coderabbitai help to get the list of available commands.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

Status, Documentation and Community

  • Visit our Status Page to check the current availability of CodeRabbit.
  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (7)
tests/utils/managed_process.py (7)

18-22: Select on pipes is not portable; thread import is fine

select.select on file descriptors works on POSIX but not on Windows pipes. Since this utility may run in CI across different runners, consider a portable approach (e.g., a blocking reader loop in a dedicated thread) or guard the select usage with a platform fallback.

Would you like a simplified, portable output loop (blocking read in the thread) that avoids select entirely?


121-136: Avoid closing stdout while the output thread may still be alive; improve logging style

  • After join(timeout=5), guard the stream close to only happen if the thread finished; otherwise, skip closing to avoid spurious exceptions.
  • Prefer structured logging with exc_info=True over f-strings for exceptions.

Apply this diff:

         # Wait for output processing thread to finish
         if (
             hasattr(self, "_output_thread")
             and self._output_thread
             and self._output_thread.is_alive()
         ):
             self._logger.info("Waiting for output thread to finish...")
             self._output_thread.join(timeout=5)  # Give it 5 seconds to finish
 
-        # Now safely close file descriptors
-        if self._proc and self._proc.stdout:
-            try:
-                self._proc.stdout.close()
-            except Exception as e:
-                self._logger.warning(f"Error closing stdout: {e}")
+        # Now safely close file descriptors (only if output thread has finished)
+        if (
+            self._proc
+            and self._proc.stdout
+            and (not getattr(self, "_output_thread", None) or not self._output_thread.is_alive())
+        ):
+            try:
+                self._proc.stdout.close()
+            except Exception:
+                self._logger.warning("Error closing stdout", exc_info=True)
+        elif getattr(self, "_output_thread", None) and self._output_thread.is_alive():
+            self._logger.warning("Output thread did not finish within timeout; skipping stdout close")

171-181: Use line buffering and explicit encoding for more robust text handling

  • Add bufsize=1 to request line-buffering on our side and improve responsiveness.
  • Specify encoding="utf-8" and errors="replace" to avoid decode errors causing thread crashes.

Apply this diff:

         self._proc = subprocess.Popen(
             self.command,
             env=self.env or os.environ.copy(),
             cwd=self.working_dir,
             stdin=subprocess.DEVNULL,
             stdout=subprocess.PIPE,
             stderr=subprocess.STDOUT,
+            bufsize=1,  # line-buffered to improve responsiveness in text mode
             start_new_session=True,  # Isolate process group to prevent kill 0 from affecting parent
-            text=True,  # Use text mode for easier line processing
+            text=True,  # Use text mode for easier line processing
+            encoding="utf-8",
+            errors="replace",
         )

224-225: Prefer structured logging with traceback over f-strings

Use exc_info=True to preserve stack traces and avoid eager string formatting.

Apply this diff:

-                            self._logger.warning(f"Line processing error: {e}")
+                            self._logger.warning("Line processing error", exc_info=True)
@@
-                self._logger.warning(f"Output processing error: {e}")
+                self._logger.warning("Output processing error", exc_info=True)

Also applies to: 231-233


234-235: Name the output thread for easier diagnostics

A thread name helps during debugging and when analyzing thread dumps.

Apply this diff:

-        self._output_thread = threading.Thread(target=process_output, daemon=True)
+        self._output_thread = threading.Thread(
+            target=process_output,
+            name=f"ManagedProcessOutput-{self._command_name}",
+            daemon=True,
+        )
         self._output_thread.start()

297-317: Avoid variable shadowing; rename loop var to improve readability

_proc (loop var) shadows self._proc. Rename to avoid confusion.

Apply this diff:

-            for _proc in psutil.process_iter(["name", "cmdline"]):
+            for ps_proc in psutil.process_iter(["name", "cmdline"]):
                 try:
                     if (
-                        _proc.name() == self._command_name
-                        or _proc.name() in self.stragglers
+                        ps_proc.name() == self._command_name
+                        or ps_proc.name() in self.stragglers
                     ):
                         self._logger.info(
-                            "Terminating Existing %s %s", _proc.name(), _proc.pid
+                            "Terminating Existing %s %s", ps_proc.name(), ps_proc.pid
                         )
 
-                        terminate_process_tree(_proc.pid, self._logger)
+                        terminate_process_tree(ps_proc.pid, self._logger)
                     for cmdline in self.straggler_commands:
-                        if cmdline in " ".join(_proc.cmdline()):
+                        if cmdline in " ".join(ps_proc.cmdline()):
                             self._logger.info(
                                 "Terminating Existing CmdLine %s %s %s",
-                                _proc.name(),
-                                _proc.pid,
-                                _proc.cmdline(),
+                                ps_proc.name(),
+                                ps_proc.pid,
+                                ps_proc.cmdline(),
                             )
-                            terminate_process_tree(_proc.pid, self._logger)
+                            terminate_process_tree(ps_proc.pid, self._logger)

171-236: Optional simplification: drop select and use a blocking line reader in the dedicated thread

Given the output is handled in its own thread and the process is terminated before joining, a blocking reader loop avoids the complexity and portability issues of select on file descriptors:

  • Portable across OSes (Windows pipes are not select-able).
  • Simpler and less error-prone.

Proposed sketch (for reference; replace the loop body accordingly):

with open(self._log_path, "w", encoding="utf-8") as f:
    for line in iter(self._proc.stdout.readline, ""):
        formatted = f"[{self._command_name.upper()}] {line.rstrip()}"
        if self.display_output:
            print(formatted, flush=True)
        f.write(formatted + "\n")
        f.flush()

This works because:

  • After terminate_process_tree(...), the child's stdout pipe is closed; readline() returns "" and the thread exits promptly.
  • No select, no platform caveats.
📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between 922850a and 67f9bab.

📒 Files selected for processing (1)
  • tests/utils/managed_process.py (5 hunks)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: Build and Test - dynamo
🔇 Additional comments (2)
tests/utils/managed_process.py (2)

88-91: New internal fields look good

Adding _proc and _output_thread simplifies lifecycle management and avoids the fragile multi-process tee/sed pipeline.


116-120: Orderly shutdown sequence LGTM

Terminating the process tree first and waiting for the main process is the right order to stop output generation before handling the output thread and stream closure.

@rmccorm4
Copy link
Contributor

Do you have a gitlab pipeline with trtllm tests? Current pipeline only ran vllm tests

@rmccorm4
Copy link
Contributor

@tanmayv25 can we get this merged to see if it helps stabilize?

@tanmayv25
Copy link
Contributor Author

tanmayv25 commented Aug 20, 2025

@tanmayv25 can we get this merged to see if it helps stabilize?

Unfortunately it is failing request migration tests. Need to spend some time to look into it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
ci Issues/PRs that reference CI build/test size/L
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants