Skip to content

Refactor optimizer into FunctionOptimizer class #2

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,176 changes: 1,176 additions & 0 deletions codeflash/optimization/function_optimizer.py

Large diffs are not rendered by default.

1,178 changes: 30 additions & 1,148 deletions codeflash/optimization/optimizer.py

Large diffs are not rendered by default.

104 changes: 49 additions & 55 deletions tests/test_code_replacement.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import dataclasses
import os
from argparse import Namespace
from collections import defaultdict
from pathlib import Path

Expand All @@ -14,7 +13,8 @@
)
from codeflash.discovery.functions_to_optimize import FunctionToOptimize
from codeflash.models.models import FunctionParent
from codeflash.optimization.optimizer import Optimizer
from codeflash.optimization.function_optimizer import FunctionOptimizer
from codeflash.verification.verification_utils import TestConfig

os.environ["CODEFLASH_API_KEY"] = "cf-test-key"

Expand Down Expand Up @@ -766,24 +766,18 @@ def main_method(self):
return HelperClass(self.name).helper_method()
"""
file_path = Path(__file__).resolve()
opt = Optimizer(
Namespace(
project_root=file_path.parent.resolve(),
disable_telemetry=True,
tests_root="tests",
test_framework="pytest",
pytest_cmd="pytest",
experiment_id=None,
test_project_root=file_path.parent.resolve(),
)
)
func_top_optimize = FunctionToOptimize(
function_name="main_method", file_path=file_path, parents=[FunctionParent("MainClass", "ClassDef")]
)
original_code = file_path.read_text()
code_context = opt.get_code_optimization_context(
function_to_optimize=func_top_optimize, project_root=file_path.parent, original_source_code=original_code
).unwrap()
test_config = TestConfig(
tests_root=file_path.parent,
tests_project_rootdir=file_path.parent,
project_root_path=file_path.parent,
test_framework="pytest",
pytest_cmd="pytest",
)
func_optimizer = FunctionOptimizer(function_to_optimize=func_top_optimize, test_cfg=test_config)
code_context = func_optimizer.get_code_optimization_context().unwrap()
assert code_context.code_to_optimize_with_helpers == get_code_output


Expand Down Expand Up @@ -1013,35 +1007,35 @@ def to_name(self) -> str:
class TestResults(BaseModel):
def __iter__(self) -> Iterator[FunctionTestInvocation]:
return iter(self.test_results)

def __len__(self) -> int:
return len(self.test_results)

def __getitem__(self, index: int) -> FunctionTestInvocation:
return self.test_results[index]

def __setitem__(self, index: int, value: FunctionTestInvocation) -> None:
self.test_results[index] = value

def __delitem__(self, index: int) -> None:
del self.test_results[index]

def __contains__(self, value: FunctionTestInvocation) -> bool:
return value in self.test_results

def __bool__(self) -> bool:
return bool(self.test_results)

def __eq__(self, other: object) -> bool:
# Unordered comparison
if not isinstance(other, TestResults) or len(self) != len(other):
return False

# Increase recursion limit only if necessary
original_recursion_limit = sys.getrecursionlimit()
if original_recursion_limit < 5000:
sys.setrecursionlimit(5000)

for test_result in self:
other_test_result = other.get_by_id(test_result.id)
if other_test_result is None or not (
Expand All @@ -1054,10 +1048,10 @@ def __eq__(self, other: object) -> bool:
):
sys.setrecursionlimit(original_recursion_limit)
return False

sys.setrecursionlimit(original_recursion_limit)
return True

def get_test_pass_fail_report_by_type(self) -> dict[TestType, dict[str, int]]:
report = {test_type: {"passed": 0, "failed": 0} for test_type in TestType}
for test_result in self.test_results:
Expand Down Expand Up @@ -1105,8 +1099,8 @@ def get_test_pass_fail_report_by_type(self) -> dict[TestType, dict[str, int]]:
)

assert (
new_code
== """from __future__ import annotations
new_code
== """from __future__ import annotations
import sys
from codeflash.verification.comparator import comparator
from enum import Enum
Expand Down Expand Up @@ -1245,21 +1239,21 @@ def cosine_similarity(X: Matrix, Y: Matrix) -> np.ndarray:
"""Row-wise cosine similarity between two equal-width matrices."""
if len(X.data) == 0 or len(Y.data) == 0:
return np.array([])

X_np, Y_np = np.asarray(X.data), np.asarray(Y.data)
if X_np.shape[1] != Y_np.shape[1]:
raise ValueError(f"Number of columns in X and Y must be the same. X has shape {X_np.shape} and Y has shape {Y_np.shape}.")
X_norm = np.linalg.norm(X_np, axis=1, keepdims=True)
Y_norm = np.linalg.norm(Y_np, axis=1, keepdims=True)

norm_product = X_norm * Y_norm.T
norm_product[norm_product == 0] = np.inf # Prevent division by zero
dot_product = np.dot(X_np, Y_np.T)
similarity = dot_product / norm_product

# Any NaN or Inf values are set to 0.0
np.nan_to_num(similarity, copy=False)

return similarity
def cosine_similarity_top_k(
X: Matrix,
Expand All @@ -1270,15 +1264,15 @@ def cosine_similarity_top_k(
"""Row-wise cosine similarity with optional top-k and score threshold filtering."""
if len(X.data) == 0 or len(Y.data) == 0:
return [], []

score_array = cosine_similarity(X, Y)

sorted_idxs = np.argpartition(-score_array.flatten(), range(top_k or len(score_array.flatten())))[:(top_k or len(score_array.flatten()))]
sorted_idxs = sorted_idxs[score_array.flatten()[sorted_idxs] > (score_threshold if score_threshold is not None else -1)]

ret_idxs = [(x // score_array.shape[1], x % score_array.shape[1]) for x in sorted_idxs]
scores = score_array.flatten()[sorted_idxs].tolist()

return ret_idxs, scores
'''
preexisting_objects: list[tuple[str, list[FunctionParent]]] = find_preexisting_objects(original_code)
Expand Down Expand Up @@ -1311,8 +1305,8 @@ def cosine_similarity_top_k(
project_root_path=Path(__file__).parent.parent.resolve(),
)
assert (
new_code
== '''import numpy as np
new_code
== '''import numpy as np
from pydantic.dataclasses import dataclass
from typing import List, Optional, Tuple, Union
@dataclass(config=dict(arbitrary_types_allowed=True))
Expand Down Expand Up @@ -1343,15 +1337,15 @@ def cosine_similarity_top_k(
"""Row-wise cosine similarity with optional top-k and score threshold filtering."""
if len(X.data) == 0 or len(Y.data) == 0:
return [], []

score_array = cosine_similarity(X, Y)

sorted_idxs = np.argpartition(-score_array.flatten(), range(top_k or len(score_array.flatten())))[:(top_k or len(score_array.flatten()))]
sorted_idxs = sorted_idxs[score_array.flatten()[sorted_idxs] > (score_threshold if score_threshold is not None else -1)]

ret_idxs = [(x // score_array.shape[1], x % score_array.shape[1]) for x in sorted_idxs]
scores = score_array.flatten()[sorted_idxs].tolist()

return ret_idxs, scores
'''
)
Expand All @@ -1370,8 +1364,8 @@ def cosine_similarity_top_k(
)

assert (
new_helper_code
== '''import numpy as np
new_helper_code
== '''import numpy as np
from pydantic.dataclasses import dataclass
from typing import List, Optional, Tuple, Union
@dataclass(config=dict(arbitrary_types_allowed=True))
Expand All @@ -1381,21 +1375,21 @@ def cosine_similarity(X: Matrix, Y: Matrix) -> np.ndarray:
"""Row-wise cosine similarity between two equal-width matrices."""
if len(X.data) == 0 or len(Y.data) == 0:
return np.array([])

X_np, Y_np = np.asarray(X.data), np.asarray(Y.data)
if X_np.shape[1] != Y_np.shape[1]:
raise ValueError(f"Number of columns in X and Y must be the same. X has shape {X_np.shape} and Y has shape {Y_np.shape}.")
X_norm = np.linalg.norm(X_np, axis=1, keepdims=True)
Y_norm = np.linalg.norm(Y_np, axis=1, keepdims=True)

norm_product = X_norm * Y_norm.T
norm_product[norm_product == 0] = np.inf # Prevent division by zero
dot_product = np.dot(X_np, Y_np.T)
similarity = dot_product / norm_product

# Any NaN or Inf values are set to 0.0
np.nan_to_num(similarity, copy=False)

return similarity
def cosine_similarity_top_k(
X: Matrix,
Expand All @@ -1406,15 +1400,15 @@ def cosine_similarity_top_k(
"""Row-wise cosine similarity with optional top-k and score threshold filtering."""
if len(X.data) == 0 or len(Y.data) == 0:
return [], []

score_array = cosine_similarity(X, Y)

sorted_idxs = np.argpartition(-score_array.flatten(), range(top_k or len(score_array.flatten())))[:(top_k or len(score_array.flatten()))]
sorted_idxs = sorted_idxs[score_array.flatten()[sorted_idxs] > (score_threshold if score_threshold is not None else -1)]

ret_idxs = [(x // score_array.shape[1], x % score_array.shape[1]) for x in sorted_idxs]
scores = score_array.flatten()[sorted_idxs].tolist()

return ret_idxs, scores
'''
)
Expand Down Expand Up @@ -1481,7 +1475,7 @@ def test_future_aliased_imports_removal() -> None:

def test_0_diff_code_replacement():
original_code = """from __future__ import annotations

import numpy as np
def functionA():
return np.array([1, 2, 3])
Expand Down
Loading