Skip to content

Conversation

@codeflash-ai
Copy link

@codeflash-ai codeflash-ai bot commented Nov 21, 2025

📄 47% (0.47x) speedup for AsyncOperationPool._get_operation in skyvern/forge/async_operations.py

⏱️ Runtime : 161 microseconds 109 microseconds (best of 250 runs)

📝 Explanation and details

The optimization replaces exception-based control flow with explicit null checks using dictionary .get() methods, resulting in a 47% speedup.

Key Changes:

  1. Eliminated try/catch overhead: The original code used KeyError exception handling for missing keys, which is expensive in Python due to exception creation and stack unwinding
  2. Two-step lookup with .get(): Instead of self._operations[task_id][agent_phase], the optimized version uses self._operations.get(task_id) followed by task_ops.get(agent_phase) if the first lookup succeeds

Why This Is Faster:

  • Exception handling cost: The line profiler shows 87% of calls (519/596) resulted in KeyError exceptions in the original code. Exception handling in Python involves creating exception objects, stack unwinding, and control flow jumps - all expensive operations
  • Dictionary .get() efficiency: The .get() method returns None for missing keys without throwing exceptions, making it much faster for lookup-heavy code with frequent misses
  • Early termination: The optimized version can return None immediately after the first failed lookup, avoiding the second dictionary access entirely

Performance Analysis from Tests:

  • Miss-heavy scenarios see biggest gains: When task_id doesn't exist, speedups range from 61-178% (e.g., test_get_operation_empty_operations_dict shows 61% improvement)
  • Hit scenarios have small overhead: When both keys exist, the optimized version is 5-17% slower due to the additional conditional check
  • Overall benefit: Since the line profiler shows 87% of calls were misses in the test workload, the optimization delivers substantial net gains

The trade-off favors workloads where cache misses are common, making this optimization particularly valuable for async operation pools where many lookups may fail to find existing operations.

Correctness verification report:

Test Status
⚙️ Existing Unit Tests 🔘 None Found
🌀 Generated Regression Tests 622 Passed
⏪ Replay Tests 🔘 None Found
🔎 Concolic Coverage Tests 🔘 None Found
📊 Tests Coverage 100.0%
🌀 Generated Regression Tests and Runtime
import asyncio
from enum import Enum, auto

# imports
import pytest  # used for our unit tests
from skyvern.forge.async_operations import AsyncOperationPool

# Minimal stubs for AgentPhase and AsyncOperation for testing purposes
class AgentPhase(Enum):
    INIT = auto()
    RUNNING = auto()
    FINISHED = auto()
    ERROR = auto()

class AsyncOperation:
    def __init__(self, name):
        self.name = name
from skyvern.forge.async_operations import AsyncOperationPool

# unit tests

@pytest.fixture
def pool():
    # Create a fresh pool for each test
    p = AsyncOperationPool()
    p._operations = {}
    return p

# 1. Basic Test Cases

def test_get_operation_basic_existing(pool):
    # Setup: Add an operation for a known task_id and agent_phase
    op = AsyncOperation("op1")
    pool._operations["task1"] = {AgentPhase.INIT: op}
    # Should return the correct operation
    codeflash_output = pool._get_operation("task1", AgentPhase.INIT) # 535ns -> 575ns (6.96% slower)

def test_get_operation_basic_nonexistent_task(pool):
    # No operation for this task_id
    pool._operations["task1"] = {AgentPhase.INIT: AsyncOperation("op1")}
    codeflash_output = pool._get_operation("unknown_task", AgentPhase.INIT) # 786ns -> 440ns (78.6% faster)

def test_get_operation_basic_nonexistent_phase(pool):
    # Operation exists for INIT, not for FINISHED
    pool._operations["task1"] = {AgentPhase.INIT: AsyncOperation("op1")}
    codeflash_output = pool._get_operation("task1", AgentPhase.FINISHED) # 941ns -> 714ns (31.8% faster)

def test_get_operation_basic_multiple_phases(pool):
    # Multiple phases for a single task
    op1 = AsyncOperation("op1")
    op2 = AsyncOperation("op2")
    pool._operations["task1"] = {
        AgentPhase.INIT: op1,
        AgentPhase.RUNNING: op2
    }
    codeflash_output = pool._get_operation("task1", AgentPhase.INIT) # 513ns -> 619ns (17.1% slower)
    codeflash_output = pool._get_operation("task1", AgentPhase.RUNNING) # 302ns -> 326ns (7.36% slower)

def test_get_operation_basic_multiple_tasks(pool):
    # Multiple tasks, each with their own operation
    op1 = AsyncOperation("op1")
    op2 = AsyncOperation("op2")
    pool._operations["task1"] = {AgentPhase.INIT: op1}
    pool._operations["task2"] = {AgentPhase.INIT: op2}
    codeflash_output = pool._get_operation("task1", AgentPhase.INIT) # 495ns -> 567ns (12.7% slower)
    codeflash_output = pool._get_operation("task2", AgentPhase.INIT) # 273ns -> 318ns (14.2% slower)

# 2. Edge Test Cases

def test_get_operation_empty_operations_dict(pool):
    # No operations at all
    pool._operations = {}
    codeflash_output = pool._get_operation("any_task", AgentPhase.INIT) # 711ns -> 406ns (75.1% faster)

def test_get_operation_task_with_empty_phases(pool):
    # Task exists but has no phases
    pool._operations["task1"] = {}
    codeflash_output = pool._get_operation("task1", AgentPhase.INIT) # 1.03μs -> 613ns (67.9% faster)

def test_get_operation_phase_with_none_value(pool):
    # Phase exists but value is None
    pool._operations["task1"] = {AgentPhase.INIT: None}
    # Should return None (the stored value, not because it's missing)
    codeflash_output = pool._get_operation("task1", AgentPhase.INIT) # 520ns -> 608ns (14.5% slower)

def test_get_operation_invalid_task_id_type(pool):
    # Pass an integer as task_id (should not raise, just not found)
    pool._operations["task1"] = {AgentPhase.INIT: AsyncOperation("op1")}
    codeflash_output = pool._get_operation(12345, AgentPhase.INIT) # 765ns -> 489ns (56.4% faster)

def test_get_operation_invalid_agent_phase_type(pool):
    # Pass a string as agent_phase (should not raise, just not found)
    pool._operations["task1"] = {AgentPhase.INIT: AsyncOperation("op1")}
    codeflash_output = pool._get_operation("task1", "RUNNING") # 779ns -> 517ns (50.7% faster)

def test_get_operation_task_id_case_sensitivity(pool):
    # Task IDs are case sensitive
    op = AsyncOperation("op1")
    pool._operations["Task1"] = {AgentPhase.INIT: op}
    codeflash_output = pool._get_operation("task1", AgentPhase.INIT) # 749ns -> 435ns (72.2% faster)

def test_get_operation_phase_enum_vs_str(pool):
    # AgentPhase must be enum, not string
    op = AsyncOperation("op1")
    pool._operations["task1"] = {AgentPhase.INIT: op}
    codeflash_output = pool._get_operation("task1", "INIT") # 895ns -> 676ns (32.4% faster)

def test_get_operation_phase_not_enum(pool):
    # AgentPhase must be enum, not int
    op = AsyncOperation("op1")
    pool._operations["task1"] = {AgentPhase.INIT: op}
    codeflash_output = pool._get_operation("task1", 1) # 777ns -> 498ns (56.0% faster)

def test_get_operation_task_id_empty_string(pool):
    # Empty string as task_id
    op = AsyncOperation("op1")
    pool._operations[""] = {AgentPhase.INIT: op}
    codeflash_output = pool._get_operation("", AgentPhase.INIT) # 551ns -> 668ns (17.5% slower)
    codeflash_output = pool._get_operation(" ", AgentPhase.INIT) # 627ns -> 338ns (85.5% faster)

def test_get_operation_task_id_special_characters(pool):
    # Task ID with special characters
    op = AsyncOperation("op1")
    pool._operations["task@!#"] = {AgentPhase.INIT: op}
    codeflash_output = pool._get_operation("task@!#", AgentPhase.INIT) # 582ns -> 620ns (6.13% slower)
    codeflash_output = pool._get_operation("task@!#", AgentPhase.FINISHED) # 720ns -> 400ns (80.0% faster)

# 3. Large Scale Test Cases

def test_get_operation_many_tasks_and_phases(pool):
    # Create 100 tasks, each with 4 phases
    num_tasks = 100
    phases = [AgentPhase.INIT, AgentPhase.RUNNING, AgentPhase.FINISHED, AgentPhase.ERROR]
    for i in range(num_tasks):
        pool._operations[f"task{i}"] = {phase: AsyncOperation(f"op{i}_{phase.name}") for phase in phases}
    # Check random tasks and phases
    codeflash_output = pool._get_operation("task0", AgentPhase.ERROR).name # 583ns -> 661ns (11.8% slower)
    codeflash_output = pool._get_operation("task99", AgentPhase.FINISHED).name # 282ns -> 339ns (16.8% slower)
    codeflash_output = pool._get_operation("task50", AgentPhase.RUNNING).name # 182ns -> 218ns (16.5% slower)
    # Check for missing task
    codeflash_output = pool._get_operation("task100", AgentPhase.INIT) # 536ns -> 230ns (133% faster)
    # Check for missing phase
    codeflash_output = pool._get_operation("task0", None) # 415ns -> 322ns (28.9% faster)

def test_get_operation_large_phases_per_task(pool):
    # Create one task with 500 unique phases (simulate with a custom Enum)
    class LargePhase(Enum):
        pass
    for i in range(500):
        setattr(LargePhase, f"PHASE_{i}", auto())
    # Actually, Python's Enum doesn't allow dynamic addition, so we use a fixed set
    phases = [AgentPhase.INIT, AgentPhase.RUNNING, AgentPhase.FINISHED, AgentPhase.ERROR]
    pool._operations["big_task"] = {phase: AsyncOperation(f"op_big_{phase.name}") for phase in phases}
    # Check all phases
    for phase in phases:
        codeflash_output = pool._get_operation("big_task", phase).name # 1.03μs -> 1.22μs (15.4% slower)
    # Check for a phase not in the dict
    codeflash_output = pool._get_operation("big_task", None) # 670ns -> 303ns (121% faster)

def test_get_operation_large_number_of_tasks(pool):
    # Create 1000 tasks, each with INIT phase
    for i in range(1000):
        pool._operations[f"t{i}"] = {AgentPhase.INIT: AsyncOperation(f"op{i}")}
    # Check a few random ones
    codeflash_output = pool._get_operation("t0", AgentPhase.INIT).name # 569ns -> 654ns (13.0% slower)
    codeflash_output = pool._get_operation("t999", AgentPhase.INIT).name # 276ns -> 293ns (5.80% slower)
    # Check for missing task
    codeflash_output = pool._get_operation("t1000", AgentPhase.INIT) # 632ns -> 230ns (175% faster)

def test_get_operation_performance_large_scale(pool):
    # Not a true performance test, but ensures function works with large dicts
    num_tasks = 500
    num_phases = 4
    for i in range(num_tasks):
        pool._operations[f"task{i}"] = {phase: AsyncOperation(f"op{i}_{phase.name}") for phase in AgentPhase}
    # Check last element
    codeflash_output = pool._get_operation(f"task{num_tasks-1}", AgentPhase.ERROR).name # 695ns -> 771ns (9.86% slower)
    # Check missing phase
    codeflash_output = pool._get_operation(f"task{num_tasks-1}", None) # 819ns -> 401ns (104% faster)
# codeflash_output is used to check that the output of the original code is the same as that of the optimized code.
import asyncio
from enum import Enum, auto

# imports
import pytest  # used for our unit tests
from skyvern.forge.async_operations import AsyncOperationPool

# --- Minimal stubs for dependencies ---
class AgentPhase(Enum):
    INIT = auto()
    RUN = auto()
    FINISH = auto()
    ERROR = auto()
    CUSTOM = auto()

class AsyncOperation:
    def __init__(self, name):
        self.name = name
    def __eq__(self, other):
        return isinstance(other, AsyncOperation) and self.name == other.name
    def __repr__(self):
        return f"AsyncOperation({self.name!r})"
from skyvern.forge.async_operations import AsyncOperationPool

# --- Unit Tests ---

@pytest.fixture
def pool():
    # Return a fresh pool for each test
    pool = AsyncOperationPool()
    pool._operations = {}
    pool._aio_tasks = {}
    return pool

# ----------- 1. Basic Test Cases -----------

def test_get_operation_basic_existing(pool):
    # Test retrieval of an existing operation
    op = AsyncOperation("op1")
    pool._operations = {
        "task1": {AgentPhase.INIT: op}
    }
    codeflash_output = pool._get_operation("task1", AgentPhase.INIT); result = codeflash_output # 569ns -> 623ns (8.67% slower)

def test_get_operation_basic_non_existing_task(pool):
    # Test retrieval for a non-existent task_id
    pool._operations = {
        "task1": {AgentPhase.INIT: AsyncOperation("op1")}
    }
    codeflash_output = pool._get_operation("task2", AgentPhase.INIT); result = codeflash_output # 768ns -> 434ns (77.0% faster)

def test_get_operation_basic_non_existing_phase(pool):
    # Test retrieval for a non-existent agent_phase
    pool._operations = {
        "task1": {AgentPhase.INIT: AsyncOperation("op1")}
    }
    codeflash_output = pool._get_operation("task1", AgentPhase.RUN); result = codeflash_output # 927ns -> 631ns (46.9% faster)

def test_get_operation_basic_multiple_phases(pool):
    # Test retrieval when several phases exist for a task
    op_init = AsyncOperation("init")
    op_run = AsyncOperation("run")
    pool._operations = {
        "task1": {
            AgentPhase.INIT: op_init,
            AgentPhase.RUN: op_run
        }
    }
    codeflash_output = pool._get_operation("task1", AgentPhase.INIT) # 548ns -> 625ns (12.3% slower)
    codeflash_output = pool._get_operation("task1", AgentPhase.RUN) # 256ns -> 287ns (10.8% slower)

def test_get_operation_basic_multiple_tasks(pool):
    # Test retrieval when several tasks exist
    op1 = AsyncOperation("op1")
    op2 = AsyncOperation("op2")
    pool._operations = {
        "task1": {AgentPhase.INIT: op1},
        "task2": {AgentPhase.RUN: op2}
    }
    codeflash_output = pool._get_operation("task1", AgentPhase.INIT) # 502ns -> 564ns (11.0% slower)
    codeflash_output = pool._get_operation("task2", AgentPhase.RUN) # 250ns -> 266ns (6.02% slower)

# ----------- 2. Edge Test Cases -----------

def test_get_operation_empty_operations_dict(pool):
    # Test retrieval when _operations is completely empty
    pool._operations = {}
    codeflash_output = pool._get_operation("any_task", AgentPhase.INIT); result = codeflash_output # 670ns -> 415ns (61.4% faster)

def test_get_operation_task_with_empty_phases(pool):
    # Test retrieval when a task exists but has no phases
    pool._operations = {
        "task1": {}
    }
    codeflash_output = pool._get_operation("task1", AgentPhase.RUN); result = codeflash_output # 1.02μs -> 672ns (51.5% faster)

def test_get_operation_with_unusual_task_id(pool):
    # Test with unusual task_id strings
    op = AsyncOperation("special")
    pool._operations = {
        "": {AgentPhase.INIT: op},
        "!@#$%^": {AgentPhase.RUN: op},
        "task with spaces": {AgentPhase.FINISH: op}
    }
    codeflash_output = pool._get_operation("", AgentPhase.INIT) # 480ns -> 626ns (23.3% slower)
    codeflash_output = pool._get_operation("!@#$%^", AgentPhase.RUN) # 393ns -> 425ns (7.53% slower)
    codeflash_output = pool._get_operation("task with spaces", AgentPhase.FINISH) # 206ns -> 218ns (5.50% slower)

def test_get_operation_with_custom_agent_phase(pool):
    # Test with a custom AgentPhase value
    op = AsyncOperation("custom")
    pool._operations = {
        "task1": {AgentPhase.CUSTOM: op}
    }
    codeflash_output = pool._get_operation("task1", AgentPhase.CUSTOM) # 492ns -> 599ns (17.9% slower)

def test_get_operation_case_sensitive_task_id(pool):
    # Test that task_id is case sensitive
    op_lower = AsyncOperation("lower")
    op_upper = AsyncOperation("upper")
    pool._operations = {
        "task": {AgentPhase.INIT: op_lower},
        "TASK": {AgentPhase.INIT: op_upper}
    }
    codeflash_output = pool._get_operation("task", AgentPhase.INIT) # 524ns -> 559ns (6.26% slower)
    codeflash_output = pool._get_operation("TASK", AgentPhase.INIT) # 312ns -> 374ns (16.6% slower)

def test_get_operation_phase_not_enum(pool):
    # Test with a non-enum agent_phase (should not raise, just return None)
    pool._operations = {
        "task1": {AgentPhase.INIT: AsyncOperation("op1")}
    }
    codeflash_output = pool._get_operation("task1", "not_enum"); result = codeflash_output # 783ns -> 477ns (64.2% faster)

def test_get_operation_task_id_not_str(pool):
    # Test with a non-str task_id (should not raise, just return None)
    pool._operations = {
        "task1": {AgentPhase.INIT: AsyncOperation("op1")}
    }
    codeflash_output = pool._get_operation(12345, AgentPhase.INIT); result = codeflash_output # 741ns -> 497ns (49.1% faster)

def test_get_operation_phase_is_none(pool):
    # Test with agent_phase as None (should not raise, just return None)
    pool._operations = {
        "task1": {AgentPhase.INIT: AsyncOperation("op1")}
    }
    codeflash_output = pool._get_operation("task1", None); result = codeflash_output # 787ns -> 512ns (53.7% faster)

def test_get_operation_task_id_is_none(pool):
    # Test with task_id as None (should not raise, just return None)
    pool._operations = {
        "task1": {AgentPhase.INIT: AsyncOperation("op1")}
    }
    codeflash_output = pool._get_operation(None, AgentPhase.INIT); result = codeflash_output # 718ns -> 471ns (52.4% faster)

def test_get_operation_mutation_does_not_affect(pool):
    # Ensure that _get_operation does not mutate the internal dicts
    op = AsyncOperation("op1")
    pool._operations = {
        "task1": {AgentPhase.INIT: op}
    }
    before = pool._operations.copy()
    pool._get_operation("task1", AgentPhase.INIT) # 582ns -> 653ns (10.9% slower)

# ----------- 3. Large Scale Test Cases -----------

def test_get_operation_large_number_of_tasks_and_phases(pool):
    # Test with a large number of tasks and phases
    num_tasks = 500
    num_phases = 5
    phases = [AgentPhase.INIT, AgentPhase.RUN, AgentPhase.FINISH, AgentPhase.ERROR, AgentPhase.CUSTOM]
    pool._operations = {}
    for i in range(num_tasks):
        pool._operations[f"task_{i}"] = {}
        for j, phase in enumerate(phases):
            pool._operations[f"task_{i}"][phase] = AsyncOperation(f"op_{i}_{phase.name}")
    # Test random access
    for i in range(0, num_tasks, 100):
        for phase in phases:
            expected = AsyncOperation(f"op_{i}_{phase.name}")
            codeflash_output = pool._get_operation(f"task_{i}", phase)
    # Test non-existing task
    codeflash_output = pool._get_operation("task_9999", AgentPhase.INIT) # 837ns -> 301ns (178% faster)
    # Test non-existing phase
    codeflash_output = pool._get_operation("task_0", "NON_EXISTENT_PHASE") # 437ns -> 336ns (30.1% faster)

def test_get_operation_large_sparse_dict(pool):
    # Test with many tasks, only some have phases
    num_tasks = 500
    pool._operations = {}
    for i in range(num_tasks):
        if i % 50 == 0:
            pool._operations[f"task_{i}"] = {AgentPhase.INIT: AsyncOperation(f"op_{i}")}
    # Only every 50th task has an operation
    for i in range(num_tasks):
        if i % 50 == 0:
            expected = AsyncOperation(f"op_{i}")
            codeflash_output = pool._get_operation(f"task_{i}", AgentPhase.INIT)
        else:
            codeflash_output = pool._get_operation(f"task_{i}", AgentPhase.INIT)

def test_get_operation_large_task_id_strings(pool):
    # Test with very long task_id strings
    long_task_id = "task_" + "x" * 500
    op = AsyncOperation("long_op")
    pool._operations = {long_task_id: {AgentPhase.INIT: op}}
    codeflash_output = pool._get_operation(long_task_id, AgentPhase.INIT) # 514ns -> 615ns (16.4% slower)
    # Test with a slightly different long string
    codeflash_output = pool._get_operation(long_task_id + "y", AgentPhase.INIT) # 693ns -> 435ns (59.3% faster)

def test_get_operation_large_operation_objects(pool):
    # Test with large AsyncOperation objects (simulate by adding lots of data)
    class LargeAsyncOperation(AsyncOperation):
        def __init__(self, name):
            super().__init__(name)
            self.data = "x" * 1000  # Simulate large payload
        def __eq__(self, other):
            return isinstance(other, LargeAsyncOperation) and self.name == other.name and self.data == other.data
    op = LargeAsyncOperation("big_op")
    pool._operations = {"task1": {AgentPhase.INIT: op}}
    codeflash_output = pool._get_operation("task1", AgentPhase.INIT); result = codeflash_output # 529ns -> 612ns (13.6% slower)
# codeflash_output is used to check that the output of the original code is the same as that of the optimized code.

To edit these changes git checkout codeflash/optimize-AsyncOperationPool._get_operation-mi89jm9m and push.

Codeflash Static Badge

The optimization replaces exception-based control flow with explicit null checks using dictionary `.get()` methods, resulting in a **47% speedup**.

**Key Changes:**
1. **Eliminated try/catch overhead**: The original code used `KeyError` exception handling for missing keys, which is expensive in Python due to exception creation and stack unwinding
2. **Two-step lookup with `.get()`**: Instead of `self._operations[task_id][agent_phase]`, the optimized version uses `self._operations.get(task_id)` followed by `task_ops.get(agent_phase)` if the first lookup succeeds

**Why This Is Faster:**
- **Exception handling cost**: The line profiler shows 87% of calls (519/596) resulted in KeyError exceptions in the original code. Exception handling in Python involves creating exception objects, stack unwinding, and control flow jumps - all expensive operations
- **Dictionary `.get()` efficiency**: The `.get()` method returns `None` for missing keys without throwing exceptions, making it much faster for lookup-heavy code with frequent misses
- **Early termination**: The optimized version can return `None` immediately after the first failed lookup, avoiding the second dictionary access entirely

**Performance Analysis from Tests:**
- **Miss-heavy scenarios see biggest gains**: When `task_id` doesn't exist, speedups range from 61-178% (e.g., `test_get_operation_empty_operations_dict` shows 61% improvement)
- **Hit scenarios have small overhead**: When both keys exist, the optimized version is 5-17% slower due to the additional conditional check
- **Overall benefit**: Since the line profiler shows 87% of calls were misses in the test workload, the optimization delivers substantial net gains

The trade-off favors workloads where cache misses are common, making this optimization particularly valuable for async operation pools where many lookups may fail to find existing operations.
@codeflash-ai codeflash-ai bot requested a review from mashraf-222 November 21, 2025 02:51
@codeflash-ai codeflash-ai bot added ⚡️ codeflash Optimization PR opened by Codeflash AI 🎯 Quality: High Optimization Quality according to Codeflash labels Nov 21, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

⚡️ codeflash Optimization PR opened by Codeflash AI 🎯 Quality: High Optimization Quality according to Codeflash

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant