Skip to content

Conversation

@yanxi-chen
Copy link
Collaborator

@yanxi-chen yanxi-chen commented Nov 13, 2025

Description

As the title says.

Motivation:

Issues could happen with current queue buffer when sync_style = dynamic_by_explorer:

  • For FIFO queue: if trainer is slower than explorer, the queue will become longer and longer (until it hits the capacity limit), and staleness of samples for training also keep increasing.
  • For priority queue: if (1) the priority function prefers fresher samples, and (2) the explorer gets slow at some point (e.g., near the end of the whole RFT process, the explorer might have stopped completely while trainer is still working), then it is possible that the priority queue will output samples of increasing staleness, causing instability and even model performance degradation.

A minimal implementation of staleness control:

  • Maintain max_model_version in QueueStorage (which covers FIFO queue and priority queue), namely the largest model version that has been seen so far. This property is updated every time new samples are put into the storage.
  • When getting samples from the storage, those with excessive staleness will be filtered out.

Caveat:

For priority queue, even if a sample has excessive staleness, it might still be put back into the queue, which is controlled by AsyncPriorityQueue. This might not affect correctness of the staleness control mechanism, but could be inefficient.

TODO:

  • Update AsyncPriorityQueue to account for staleness control.
  • Staleness control might be implemented as a SampleStrategy instead. Need to decide if this is a better option.

Checklist

Please check the following items before code is ready to be reviewed.

  • Code has passed all tests
  • Docstrings have been added/updated in Google Style
  • Documentation has been updated
  • Code is ready for review

@gemini-code-assist
Copy link
Contributor

Summary of Changes

Hello @yanxi-chen, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request introduces a crucial staleness control mechanism to the queue buffer system. Its primary purpose is to mitigate issues arising from a mismatch in processing speeds between the trainer and explorer, which can lead to an accumulation of stale samples and potential performance degradation. By tracking the latest model version and filtering out excessively old samples, this change ensures that the training process consistently receives fresh and relevant data, thereby enhancing stability and model performance.

Highlights

  • Staleness Control Implementation: Introduced a minimal mechanism to control sample staleness within the QueueStorage by tracking the maximum model version seen and filtering out old samples.
  • Configuration Update: Added a new staleness_limit parameter to StorageConfig to allow users to define the maximum acceptable staleness for samples.
  • Sample Filtering Logic: Implemented logic in get_batch to discard samples whose model version is significantly older than the most recent model version, based on the configured staleness_limit.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a mechanism for staleness control in the queue-based storage, which is a valuable addition to prevent training on outdated samples. The implementation adds a staleness_limit config and tracks the max_model_version to filter out stale experiences. My review focuses on potential efficiency improvements. I've pointed out a micro-optimization in updating max_model_version and a more significant performance concern related to how stale samples are handled with priority queues that have sample reuse enabled, which could lead to inefficiencies and timeouts.

Comment on lines +369 to +375
if (self.staleness_limit is not None) and (self.staleness_limit > 0):
exp_list = [
exp
for exp in exp_list
if exp.info["model_version"]
>= self.max_model_version - self.staleness_limit
]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The current implementation for staleness control has a significant inefficiency when used with a priority queue that has sample reuse enabled (reuse_cooldown_time is not None). As noted in the PR description, stale samples are filtered out after they have been retrieved from the queue, but AsyncPriorityQueue will have already re-queued them. This means stale samples are never truly purged from the buffer, and the queue might fill up with them. This can lead to get_batch repeatedly fetching and discarding stale samples, potentially causing timeouts and degrading performance.

A better approach would be to prevent re-queuing of stale samples. This logic could be moved into AsyncPriorityQueue so that it can check for staleness before re-queuing an item. This would likely require QueueStorage to provide the max_model_version to the queue instance whenever it's updated. While this is a WIP, this is a critical design point to address for the feature to be robust.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sense, need to update AsyncPriorityQueue to account for staleness control.

Comment on lines +356 to +358
for exp in exp_list:
if exp.info["model_version"] > self.max_model_version:
self.max_model_version = exp.info["model_version"]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The loop to update max_model_version can be made more efficient. Currently, it might perform multiple assignments to self.max_model_version within a single batch. A more efficient approach is to find the maximum version within the batch first, and then update self.max_model_version only once if needed.

Suggested change
for exp in exp_list:
if exp.info["model_version"] > self.max_model_version:
self.max_model_version = exp.info["model_version"]
if exp_list:
max_version_in_batch = max(exp.info["model_version"] for exp in exp_list)
if max_version_in_batch > self.max_model_version:
self.max_model_version = max_version_in_batch

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant