Skip to content

Advanced Usage

Abhishek Gahlot edited this page Mar 27, 2026 · 1 revision

Advanced Usage

Batch scoring for GRPO

GRPO needs N completions per prompt scored, then advantages computed:

from deepgym import DeepGym, load_environment

dg = DeepGym(mode='local')
env = load_environment('coin_change')

solutions = [model.generate(prompt) for _ in range(8)]
batch = dg.run_batch(env, solutions, max_parallel=8)

scores = [r.score for r in batch.results]
mean = sum(scores) / len(scores)
std = (sum((s - mean) ** 2 for s in scores) / len(scores)) ** 0.5
advantages = [(s - mean) / (std + 1e-8) for s in scores]

With shaped rewards

Use correctness + efficiency for a richer signal:

advantages = []
for result in batch.results:
    if result.reward_components:
        r = 0.7 * result.reward_components.get('correctness', 0.0) \
          + 0.3 * result.reward_components.get('efficiency', 0.0)
    else:
        r = result.score
    advantages.append(r)

mean = sum(advantages) / len(advantages)
std = (sum((a - mean) ** 2 for a in advantages) / len(advantages)) ** 0.5
advantages = [(a - mean) / (std + 1e-8) for a in advantages]

Per-test rewards

from deepgym.integrations.reward import RewardFunction

reward_fn = RewardFunction(env, max_parallel=8)
per_test = reward_fn.per_test_rewards(solutions)
# [
#   {'test_0': 1.0, 'test_1': 0.0, 'test_2': 1.0, ..., 'overall': 0.75},
#   {'test_0': 1.0, 'test_1': 1.0, 'test_2': 1.0, ..., 'overall': 1.0},
#   ...
# ]

Gymnasium-style API

Standard reset() / step() interface:

from deepgym.gym import DeepGymEnv
from deepgym import load_environment

env = load_environment('coin_change')
gym_env = DeepGymEnv(environment=env, max_steps=3)

obs = gym_env.reset()
print(obs.task)  # "Write a function coin_change..."

obs, reward, done, info = gym_env.step('def coin_change(coins, amount): ...')
print(f'Reward: {reward}, Done: {done}')
print(f'Score: {info.score}, Cases: {info.cases}')

Batch stepping:

results = gym_env.step_batch(
    ['solution1', 'solution2', 'solution3'],
    max_parallel=10,
)
for obs, reward, done, info in results:
    print(f'{reward:.2f}')

Multi-turn episodes

For problems that need multiple interactions (debugging, iterative refinement):

from deepgym.multi_turn import MultiTurnRunner
from deepgym.models import MultiTurnEnvironment, Action

env = MultiTurnEnvironment(
    task='Fix the buggy sorting function.',
    setup_code='# initial buggy code here...',
    step_verifier_code='# check intermediate progress...',
    final_verifier_code='# check final solution...',
    max_steps=5,
    timeout_per_step=30,
)

def my_agent(observation):
    return Action(content='fixed code...', action_type='code')

runner = MultiTurnRunner(safe_mode=True)
trajectory, result = runner.run(env, agent=my_agent)

print(f'Total reward: {trajectory.total_reward}')
print(f'Steps: {len(trajectory.steps)}')
print(f'Step rewards: {trajectory.step_rewards}')

Async batch processing

When you need throughput, use the async client:

import asyncio
from deepgym import AsyncDeepGym, load_environment

async def score_all():
    dg = AsyncDeepGym(mode='daytona')
    envs = ['coin_change', 'two_sum', 'climbing_stairs']

    tasks = [
        dg.run(load_environment(name), solutions[name])
        for name in envs
    ]
    results = await asyncio.gather(*tasks, return_exceptions=True)

    for name, result in zip(envs, results):
        if isinstance(result, Exception):
            print(f'{name}: ERROR - {result}')
        else:
            print(f'{name}: {result.score:.2f}')

asyncio.run(score_all())

Remote API

Point AsyncDeepGym at a remote server:

dg = AsyncDeepGym(api_url='https://your-deepgym-server.com')
result = await dg.run(env, model_output=solution)

Same API, requests go over HTTP.


Custom reward composition

Combine multiple signals:

from deepgym.integrations.reward import RewardFunction

reward_fn = RewardFunction(env, max_parallel=8)
batch = reward_fn.call_with_details(solutions)

rewards = []
for result in batch.results:
    r = result.score

    # bonus for efficiency
    if result.reward_components and 'efficiency' in result.reward_components:
        r += 0.1 * result.reward_components['efficiency']

    # penalty for timeout
    if result.truncated:
        r *= 0.5

    # partial credit from hard tests
    if result.cases:
        hard_tests = [c for c in result.cases if 'edge' in c.id]
        if hard_tests:
            hard_score = sum(c.score for c in hard_tests) / len(hard_tests)
            r += 0.2 * hard_score

    rewards.append(min(r, 1.0))

Clone this wiki locally