Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 75 additions & 6 deletions recipe/dapo/dapo_ray_trainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@
)
from verl.utils.profiler import marked_timer
from verl.utils.rollout_skip import RolloutSkip

from verl.utils.pass_rate_tracker import PassRateTracker
Copy link

Copilot AI Jan 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Import of 'PassRateTracker' is not used.

Suggested change
from verl.utils.pass_rate_tracker import PassRateTracker

Copilot uses AI. Check for mistakes.
from verl.utils.pass_rate_weighted_sampler import PassRateWeightedSampler

class RayDAPOTrainer(RayPPOTrainer):
"""
Expand All @@ -68,12 +69,19 @@ def fit(self):
config=OmegaConf.to_container(self.config, resolve=True),
)

self.global_steps = 0
self.global_steps = 0
self.gen_steps = 0

# load checkpoint before doing anything
self._load_checkpoint()

# Extract pass rate tracker from sampler if using curriculum learning
# The PassRateWeightedSampler owns the tracker internally but we need to manually update it during training
# Currently, we only support PassRateWeightedSampler for curriculum learning
self.pass_rate_tracker = None
self.data_sampler = self.train_dataloader.sampler # train_dataloader is created in `RayPPOTrainer._create_dataloader()` and always has a sampler
if isinstance(self.data_sampler, PassRateWeightedSampler):
self.pass_rate_tracker = self.data_sampler.pass_rate_tracker

# perform validation before training
# currently, we only support validation using the reward_function.
if self.val_reward_fn is not None and self.config.trainer.get("val_before_train", True):
Expand Down Expand Up @@ -135,7 +143,6 @@ def fit(self):
non_tensor_batch_keys=["raw_prompt_ids"],
)
gen_batch = gen_batch.repeat(repeat_times=self.config.actor_rollout_ref.rollout.n, interleave=True)

is_last_step = self.global_steps >= self.total_training_steps

with marked_timer("step", timing_raw):
Expand Down Expand Up @@ -189,7 +196,6 @@ def fit(self):
reward_extra_infos_dict = {}

new_batch.batch["token_level_scores"] = reward_tensor

if reward_extra_infos_dict:
new_batch.non_tensor_batch.update(
{k: np.array(v) for k, v in reward_extra_infos_dict.items()}
Expand All @@ -206,6 +212,46 @@ def fit(self):
else:
new_batch.batch["token_level_rewards"] = new_batch.batch["token_level_scores"]

# === Curriculum Learning: Update pass rate tracker for weighted resampling ===
# When using PassRateWeightedSampler, track per-sample success rates to enable dynamic curriculum learning.
# The sampler uses these pass rates to adjust sampling probabilities in the next epoch.

# Note: make updating the pass rate tracker as a utility function later
# 1. if sampler is an instance of PassRateWeightedSampler, self.pass_rate_tracker is not None
# 2. `dataset_index` field is added to the RL datatset to identify samples
if "dataset_index" in new_batch.non_tensor_batch and self.pass_rate_tracker is not None:
dataset_indices = new_batch.non_tensor_batch["dataset_index"]
# Sum token-level rewards to get sequence-level reward
seq_rewards = new_batch.batch["token_level_rewards"].sum(dim=-1).cpu().numpy()
# Success is 1 if sequence reward > 0, else 0
successes = (seq_rewards > 0).astype(float)

# Deduplicate: batch was repeated n times (interleaved), so we need to aggregate
unique_indices, inverse_indices = np.unique(dataset_indices, return_inverse=True)

# Aggregate successes: take mean across rollouts for each sample
aggregated_successes = np.zeros(len(unique_indices), dtype=float)
for i, _ in enumerate(unique_indices):
mask = inverse_indices == i # boolean array to indicate positions of unique index i
aggregated_successes[i] = np.mean(successes[mask]) # take average success across rollouts for sample i

pass_rates = self.pass_rate_tracker.get_pass_rates()

# Log curriculum metrics BEFORE updating tracker
# Track improvement of hardest samples (across all samples, not just attempted)
metrics['curriculum/hardest_10pct_pass_rate'] = float(np.percentile(pass_rates, 10))
metrics['curriculum/hardest_25pct_pass_rate'] = float(np.percentile(pass_rates, 25))
metrics['curriculum/hardest_50pct_pass_rate'] = float(np.percentile(pass_rates, 50))
metrics['curriculum/hardest_75pct_pass_rate'] = float(np.percentile(pass_rates, 75))

# Batch-level statistics
metrics['curriculum/min_batch_pass_rate'] = float(np.min(aggregated_successes))
metrics['curriculum/mean_batch_pass_rate'] = float(np.mean(aggregated_successes))
metrics['curriculum/effective_batch_size'] = np.sum(aggregated_successes > 0)/len(unique_indices)
Copy link

Copilot AI Jan 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Division by zero is possible when len(unique_indices) is 0, though this is unlikely in practice. Consider adding a check to avoid potential runtime errors in edge cases.

Copilot uses AI. Check for mistakes.

# Update tracker with current batch results
self.pass_rate_tracker.update(sample_indices=unique_indices.astype(int), current_pass_rate=aggregated_successes)
Copy link

Copilot AI Jan 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The pass_rate_tracker is being updated on every worker/rank in a distributed training setup, but there's no synchronization across ranks. Each rank will maintain its own separate pass rate statistics based only on the samples it processes. This could lead to inconsistent curriculum learning behavior across different ranks. Consider adding a synchronization step (e.g., all-reduce) to aggregate pass rate statistics across all ranks, or ensure only rank 0 updates the tracker and then broadcasts the state.

Suggested change
self.pass_rate_tracker.update(sample_indices=unique_indices.astype(int), current_pass_rate=aggregated_successes)
# In distributed setups, ensure only rank 0 updates the tracker to avoid
# each rank maintaining inconsistent local pass-rate statistics.
if torch.distributed.is_available() and torch.distributed.is_initialized():
if torch.distributed.get_rank() == 0:
self.pass_rate_tracker.update(
sample_indices=unique_indices.astype(int),
current_pass_rate=aggregated_successes,
)
else:
self.pass_rate_tracker.update(
sample_indices=unique_indices.astype(int),
current_pass_rate=aggregated_successes,
)

Copilot uses AI. Check for mistakes.

if not self.config.algorithm.filter_groups.enable:
batch = new_batch
else: # NOTE: When prompts after filtering is less than train batch size,
Expand Down Expand Up @@ -280,7 +326,6 @@ def fit(self):
# === Updating ===

batch.batch["response_mask"] = compute_response_mask(batch)

# Balance the number of valid tokens across DP ranks.
# NOTE: This usually changes the order of data in the `batch`,
# which won't affect the advantage calculation (since it's based on uid),
Expand Down Expand Up @@ -342,6 +387,7 @@ def fit(self):
actor_output = self.actor_rollout_wg.update_actor(batch)
actor_output_metrics = reduce_metrics(actor_output.meta_info["metrics"])
metrics.update(actor_output_metrics)
print("in critic warmup loop")

# Log rollout generations if enabled
rollout_data_dir = self.config.trainer.get("rollout_data_dir", None)
Expand Down Expand Up @@ -430,6 +476,29 @@ def _to_sequence(value):
num_total_prompts = 0
num_gen_batches = 0

# Add curriculum learning metrics to W&B
if self.data_sampler is not None:
Copy link

Copilot AI Jan 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The condition if self.data_sampler is not None is insufficient. It should check if the sampler is an instance of PassRateWeightedSampler specifically, since get_wandb_3d_plot_data is only available on that class. Other samplers will cause an AttributeError. Change to: if isinstance(self.data_sampler, PassRateWeightedSampler).

Suggested change
if self.data_sampler is not None:
if isinstance(self.data_sampler, PassRateWeightedSampler):

Copilot uses AI. Check for mistakes.
# Add 3D plot data for weight and count distributions (percentile-based)
try:
import wandb
weight_3d_data = self.data_sampler.get_wandb_3d_plot_data(metric_type='weight')
count_3d_data = self.data_sampler.get_wandb_3d_plot_data(metric_type='count')

# Add step to each data point for 3D visualization
for point in weight_3d_data:
point['step'] = self.global_steps
for point in count_3d_data:
point['step'] = self.global_steps

metrics['curriculum/weight_distribution_3d'] = wandb.Table(
dataframe=__import__('pandas').DataFrame(weight_3d_data)
Copy link

Copilot AI Jan 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The inline imports of wandb and pandas (using import) inside the metrics logging code is an anti-pattern. These should be imported at the module level or handled more cleanly. This dynamic import pattern can cause issues with IDE autocomplete, type checking, and makes dependencies less clear. If wandb/pandas are optional dependencies, consider using a try-except block at the module level and setting a flag.

Copilot uses AI. Check for mistakes.
) if weight_3d_data else None
metrics['curriculum/count_distribution_3d'] = wandb.Table(
dataframe=__import__('pandas').DataFrame(count_3d_data)
) if count_3d_data else None
except ImportError:
pass # wandb or pandas not available

# TODO: make a canonical logger that supports various backend
logger.log(data=metrics, step=self.global_steps)

Expand Down
2 changes: 1 addition & 1 deletion scripts/tools/serve_math_llm_as_verifier.sh
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/bin/bash
#SBATCH --job-name=server_math_llm_as_verifier
#SBATCH --partition=higherprio
#SBATCH --partition=main
#SBATCH --nodes=1
#SBATCH --ntasks=1
#SBATCH --cpus-per-task=64
Expand Down
46 changes: 30 additions & 16 deletions scripts/train/example_multinode_rl_qwen2.5_32b_base_fsdp.sh
Original file line number Diff line number Diff line change
Expand Up @@ -4,27 +4,32 @@
#SBATCH --ntasks=8
#SBATCH --ntasks-per-node=1
#SBATCH --gres=gpu:8
#SBATCH --cpus-per-task=96
#SBATCH --cpus-per-task=128
#SBATCH --mem=0
#SBATCH --output=slurm/%x-%j.out
#SBATCH --error=slurm/%x-%j.err
#SBATCH --output=slurm/%x-%j.log
#SBATCH --error=slurm/%x-%j.log
#SBATCH --exclusive
#SBATCH --time=720:00:00
#SBATCH --partition=main
#SBATCH --account=iq


# =================== Frequently Used Variables ===================
RESUME_CKPT_DIR_NAME="" # Fill in the checkpoint directory name to resume from, otherwise from scratch
export STEM_LLM_JUDGE_URL="<STEM_LLM_JUDGE_URL>" # Fill in the llm-as-judge hosted URL, currently used only in 'STEM' domain

# =================== Cluster Environment ===================
export NCCL_DEBUG=info
export NCCL_ALGO=NVLSTree
export NCCL_IBEXT_DISABLE=1
export CONDA_BIN_PATH=/mnt/weka/home/jalaj.bhandari/miniconda3/envs/jalaj_sync_rl/bin/
export NCCL_TIMEOUT_SECONDS=4800
export TORCH_NCCL_ENABLE_MONITORING=0
export NCCL_DEBUG=warn
export NCCL_NET=IB
export NCCL_IB_HCA="mlx5_0,mlx5_1,mlx5_2,mlx5_3,mlx5_4,mlx5_5,mlx5_6,mlx5_7"
export NCCL_CROSS_NIC=1
export NCCL_IB_TC=136
export NCCL_SOCKET_IFNAME="^lo,docker,virbr"
export CUDA_DEVICE_MAX_CONNECTIONS=8
export NCCL_NVLS_ENABLE=1
export NCCL_IB_HCA=mlx5
export UCX_NET_DEVICES=mlx5_0:1,mlx5_1:1,mlx5_2:1,mlx5_3:1,mlx5_4:1,mlx5_5:1,mlx5_6:1,mlx5_7:1
export CUDA_DEVICE_MAX_CONNECTIONS=1
export CUDA_LAUNCH_BLOCKING=1

# Get the list of allocated nodes
nodes=( $(scontrol show hostnames "$SLURM_JOB_NODELIST") )
Expand All @@ -39,18 +44,25 @@ address_head=$head_node_ip:$port

export worker_num=$SLURM_NNODES
export HYDRA_FULL_ERROR=1
export VLLM_USE_V1=0
export VLLM_USE_V1=1

echo "Number of nodes (workers): $worker_num"

# =================== Data Mixture ===================
SHARED_DATA_PATH=./data
TRAIN_DATA_DIR=${SHARED_DATA_PATH}/train/
TEST_DATA_DIR=${SHARED_DATA_PATH}/online_eval/
# SHARED_DATA_PATH=/mnt/weka/shrd/k2tls/k2v2rl-data/
# TRAIN_DATA_DIR=${SHARED_DATA_PATH}/train/
# TEST_DATA_DIR=${SHARED_DATA_PATH}/online_eval/

TRAIN_DATA_DIR=/mnt/weka/shrd/k2tls/k2v2rl-data/data_mix_1/main_questions
TEST_DATA_DIR=/mnt/weka/shrd/k2tls/rl-test-data-12k

# Math (train)
math_train_path=${TRAIN_DATA_DIR}/math__combined_54.4k.parquet
# math_train_path=${TRAIN_DATA_DIR}/math__combined_54.4k.parquet
math_train_path=${TRAIN_DATA_DIR}/math__combined_118.2k.part2_scored.parquet
# Math (test)
math_test_path=${TEST_DATA_DIR}/math__math_500.parquet
aime_test_path=${TEST_DATA_DIR}/math__aime_repeated_8x_240.parquet
aime25_test_path2=${TEST_DATA_DIR}/math__aime2025_repeated_8x_240.parquet
amc_test_path=${TEST_DATA_DIR}/math__amc_repeated_4x_332.parquet

# Code (train)
Expand Down Expand Up @@ -93,7 +105,9 @@ webinstruct_train_path=${TRAIN_DATA_DIR}/stem__web_3.6k.parquet
supergpqa_test_path=${TEST_DATA_DIR}/stem__supergpqa_200.parquet

train_files="['${math_train_path}']" # Use math as example, add to more tasks as needed
test_files="['${math_test_path}','${aime_test_path}']" # Use math as example, add to more tasks as needed
# test_files="['${math_test_path}','${aime_test_path}']" # Use math as example, add to more tasks as needed
test_files="['${math_test_path}','${aime_test_path}','${aime25_test_path2}','${amc_test_path}']" # Use math as example, add to more tasks as needed


# =================== Model ===================
BASE_MODEL=Qwen/Qwen2.5-32B
Expand Down
48 changes: 34 additions & 14 deletions scripts/train/example_singlenode_rl_qwen2.5_7b_base_fsdp.sh
Original file line number Diff line number Diff line change
Expand Up @@ -10,25 +10,42 @@ WANDB_PROJECT="Reasoning360" # Your wandb project name

# --- External Services ---
export STEM_LLM_JUDGE_URL="<STEM_LLM_JUDGE_URL>" # Optional: Fill in the llm-as-judge hosted URL for 'STEM' domain evaluation
export MATH_LLM_JUDGE_URL="http://azure-uk-hpc-H200-instance-853:8000" # Fill in the OmniMATH llm-as-judge hosted URL, only used to score OmniMATH dataset if not empty
Copy link

Copilot AI Jan 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The MATH_LLM_JUDGE_URL contains a hardcoded internal hostname 'azure-uk-hpc-H200-instance-853:8000'. This should be replaced with a placeholder (like the STEM_LLM_JUDGE_URL pattern) to avoid leaking internal infrastructure details and to make the script more portable across different environments.

Suggested change
export MATH_LLM_JUDGE_URL="http://azure-uk-hpc-H200-instance-853:8000" # Fill in the OmniMATH llm-as-judge hosted URL, only used to score OmniMATH dataset if not empty
export MATH_LLM_JUDGE_URL="<MATH_LLM_JUDGE_URL>" # Optional: Fill in the OmniMATH llm-as-judge hosted URL, only used to score OmniMATH dataset if not empty

Copilot uses AI. Check for mistakes.
Copy link

Copilot AI Jan 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The MATH_LLM_JUDGE_URL is configured to use plain http://, so all OmniMATH scoring requests (including prompts, model outputs, and scores) will be transmitted in cleartext over the cluster network. An attacker or malicious tenant with network access could sniff or tamper with this traffic, corrupting evaluation results or exfiltrating potentially sensitive data. Use an HTTPS endpoint for the math judge service and ensure TLS certificate validation is enabled so these requests are encrypted and integrity-protected.

Suggested change
export MATH_LLM_JUDGE_URL="http://azure-uk-hpc-H200-instance-853:8000" # Fill in the OmniMATH llm-as-judge hosted URL, only used to score OmniMATH dataset if not empty
export MATH_LLM_JUDGE_URL="https://azure-uk-hpc-H200-instance-853:8000" # Fill in the OmniMATH llm-as-judge HTTPS URL, only used to score OmniMATH dataset if not empty

Copilot uses AI. Check for mistakes.

# =================== Environment Setup ===================
export NCCL_DEBUG=info
export CUDA_DEVICE_MAX_CONNECTIONS=1
# export CUDA_LAUNCH_BLOCKING=1 # Uncomment for easier debugging of CUDA errors

export HYDRA_FULL_ERROR=1
export VLLM_USE_V1=0
export VLLM_USE_V1=1

# =================== Data Mixture ===================
SHARED_DATA_PATH=./data
TRAIN_DATA_DIR=${SHARED_DATA_PATH}/train/
TEST_DATA_DIR=${SHARED_DATA_PATH}/online_eval/

# ---- old path ----
# SHARED_DATA_PATH=/mnt/sharefs/users/zhuojun.cheng
# SHARED_MODEL_PATH=/mnt/sharefs/users/haonan.li/models
# TRAIN_DATA_DIR=${SHARED_DATA_PATH}/guru_data/train/guru92k_release_0603
# TEST_DATA_DIR=${SHARED_DATA_PATH}/guru_data/test/online # ← unchanged

# Math (train)
math_train_path=${TRAIN_DATA_DIR}/math__combined_54.4k.parquet
# Math (test)
# math_train_path=${TRAIN_DATA_DIR}/math__combined_54.4k.parquet

# ---- Math datasets tried for curriculum learning ----
math_train_path=/mnt/weka/shrd/k2tls/k2v2rl-data/data-mixtures/pass_rate_sampling/medium/data_mix_math_20000/main_questions/math__combined_118.2k.part2_scored.parquet
# math_test_path=/mnt/weka/shrd/k2tls/k2v2rl-data/data-mixtures/pass_rate_sampling/medium/data_mix_math_20000/main_questions/math__combined_118.2k.part2_scored.parquet

# math_train_path=/mnt/weka/shrd/k2tls/jalaj/data-mixtures/medium/data_mix_math_3000/main_questions/math__combined_118.2k.part1_scored.parquet
# math_test_path=/mnt/weka/shrd/k2tls/jalaj/data-mixtures/medium/data_mix_math_3000/main_questions/math__combined_118.2k.part1_scored.parquet

# math_train_path=/mnt/weka/shrd/k2tls/jalaj/data-mixtures_round2/medium/data_mix_math_9000/main_questions/math__combined_118.2k.part1_scored.parquet
# math_train_path2=/mnt/weka/shrd/k2tls/jalaj/data-mixtures_round2/medium/data_mix_math_9000/main_questions/math__combined_118.2k.part2_scored.parquet

# test data: new data path for all test datasets
TEST_DATA_DIR=/mnt/weka/shrd/k2tls/rl-test-data-12k
math_test_path=${TEST_DATA_DIR}/math__math_500.parquet
aime_test_path=${TEST_DATA_DIR}/math__aime_repeated_8x_240.parquet
aime25_test_path2=${TEST_DATA_DIR}/math__aime2025_repeated_8x_240.parquet
amc_test_path=${TEST_DATA_DIR}/math__amc_repeated_4x_332.parquet

# Code (train)
Expand Down Expand Up @@ -71,8 +88,9 @@ webinstruct_train_path=${TRAIN_DATA_DIR}/stem__web_3.6k.parquet
supergpqa_test_path=${TEST_DATA_DIR}/stem__supergpqa_200.parquet

train_files="['${math_train_path}']" # Use math as example, add to more tasks as needed
test_files="['${math_test_path}','${aime_test_path}']" # Use math as example, add to more tasks as needed

# train_files="['${math_train_path}', '${math_train_path2}']" # Use math as example, add to more tasks as needed
# test_files="['${math_test_path}','${aime_test_path}']" # Use math as example, add to more tasks as needed
test_files="['${math_test_path}','${aime_test_path}','${aime25_test_path2}','${amc_test_path}']" # Use math as example, add to more tasks as needed
# =================== Model ===================
BASE_MODEL=Qwen/Qwen2.5-7B

Expand All @@ -95,7 +113,6 @@ echo "Starting Ray on the local node with ${NUM_GPUS} GPUs..."
${CONDA_BIN_PATH}ray start --head --num-gpus ${NUM_GPUS} --include-dashboard=True --dashboard-port 8265
sleep 5


# =================== RL Config ===================
# Note, we borrowed the config format from DAPO while here disabled all DAPO features to run the naive RL baseline.

Expand All @@ -116,14 +133,13 @@ overlong_buffer_len=$((1024 * 4))
overlong_penalty_factor=1.0

loss_agg_mode="token-mean"

enable_filter_groups=False
filter_groups_metric=acc
max_num_gen_batches=10
train_prompt_bsz=512 # on-policy model update batchsize: train_prompt_bsz * rollout.n
train_prompt_bsz=64 # on-policy model update batchsize: train_prompt_bsz * rollout.n
gen_prompt_bsz=$((train_prompt_bsz * 1))
n_resp_per_prompt=16
train_prompt_mini_bsz=64 # model grad update batchsize
n_resp_per_prompt=8
train_prompt_mini_bsz=32 # model grad update batchsize

# Algorithm
temperature=1.0
Expand Down Expand Up @@ -232,4 +248,8 @@ python -m recipe.dapo.main_dapo \
trainer.test_freq=10 \
trainer.total_epochs=10 \
trainer.log_val_generations=50 \
trainer.resume_mode=auto
trainer.resume_mode=auto
# set the following to enable pass-rate based weighted sampling
# data.sampler.class_path='pkg://verl.utils.pass_rate_weighted_sampler' \
# data.sampler.class_name='PassRateWeightedSampler' \
# data.sampler.pass_rate_temperature=0.5 \
Loading