Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 1 addition & 27 deletions examples/mlx_metal_kernel_opt/README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# 🎯 Qwen3-0.6B Custom Metal Kernel Optimization with OpenEvolve
# 🎯Custom Metal Kernel Optimization with OpenEvolve

**Evolving custom GPU kernels for Grouped Query Attention using MLX Metal kernels for Qwen3-0.6B on Apple Silicon**

Expand Down Expand Up @@ -416,29 +416,3 @@ python run_benchmarks.py --mode compare
---

**🎯 This example demonstrates OpenEvolve's capability to discover genuine algorithmic improvements through evolutionary optimization, achieving measurable performance gains on real hardware with production-ready implementations.**

## 🔧 **Recent Improvements**

### **✅ Correct Terminology**
- **Before**: Incorrect references to "chunked GQA processing"
- **After**: Accurate descriptions of custom Metal kernel optimization
- **Benefits**: Technical accuracy and clear understanding of actual discoveries

### **✅ Comprehensive Testing**
- **Before**: Basic performance measurement
- **After**: 17-scenario comprehensive benchmark suite with statistical validation
- **Benefits**: Robust performance analysis and reproducible results

### **✅ Production Integration**
- **Before**: Standalone optimization experiments
- **After**: Full MLX-LM integration with seamless switching
- **Benefits**: Real-world usability and easy adoption

### **✅ Detailed Documentation**
- **Before**: High-level optimization descriptions
- **After**: Complete technical details with actual kernel code snippets
- **Benefits**: Understanding, reproducibility, and further research

---

**🚀 Ready for custom Metal kernel evolution with comprehensive benchmarking and detailed analysis!**
126 changes: 105 additions & 21 deletions openevolve/evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,20 @@ async def evaluate_program(
# Process the result based on type
eval_result = self._process_evaluation_result(result)

# Check if this was a timeout and capture artifacts if enabled
if artifacts_enabled and program_id and eval_result.metrics.get("timeout") is True:
if program_id not in self._pending_artifacts:
self._pending_artifacts[program_id] = {}

self._pending_artifacts[program_id].update(
{
"timeout": True,
"timeout_duration": self.config.timeout,
"failure_stage": "evaluation",
"error_type": "timeout",
}
)

# Add LLM feedback if configured
llm_eval_result = None
if self.config.use_llm_feedback and self.llm_ensemble:
Expand All @@ -153,7 +167,8 @@ async def evaluate_program(
)
and program_id
):
self._pending_artifacts[program_id] = {}
if program_id not in self._pending_artifacts:
self._pending_artifacts[program_id] = {}

# Merge eval_result artifacts with llm artifacts if they exist
if eval_result.has_artifacts():
Expand All @@ -179,6 +194,21 @@ async def evaluate_program(
# Return just metrics for backward compatibility
return eval_result.metrics

except asyncio.TimeoutError:
# Handle timeout specially - don't retry, just return timeout result
logger.warning(f"Evaluation timed out after {self.config.timeout}s")

# Capture timeout artifacts if enabled
if artifacts_enabled and program_id:
self._pending_artifacts[program_id] = {
"timeout": True,
"timeout_duration": self.config.timeout,
"failure_stage": "evaluation",
"error_type": "timeout",
}

return {"error": 0.0, "timeout": True}

except Exception as e:
last_exception = e
logger.warning(
Expand All @@ -192,6 +222,7 @@ async def evaluate_program(
"stderr": str(e),
"traceback": traceback.format_exc(),
"failure_stage": "evaluation",
"attempt": attempt + 1,
}

# If this is not the last attempt, wait a bit before retrying
Expand Down Expand Up @@ -242,32 +273,36 @@ def get_pending_artifacts(self, program_id: str) -> Optional[Dict[str, Union[str
"""
return self._pending_artifacts.pop(program_id, None)

@run_in_executor
def _direct_evaluate(self, program_path: str) -> Dict[str, float]:
async def _direct_evaluate(self, program_path: str) -> Dict[str, float]:
"""
Directly evaluate a program using the evaluation function
Directly evaluate a program using the evaluation function with timeout

Args:
program_path: Path to the program file

Returns:
Dictionary of metric name to score

Raises:
asyncio.TimeoutError: If evaluation exceeds timeout
Exception: If evaluation function raises an exception
"""
try:
# Run the evaluation with timeout
result = self.evaluate_function(program_path)

# Validate result
if not isinstance(result, dict):
logger.warning(f"Evaluation returned non-dictionary result: {result}")
return {"error": 0.0}
# Create a coroutine that runs the evaluation function in an executor
async def run_evaluation():
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, self.evaluate_function, program_path)

return result
# Run the evaluation with timeout - let exceptions bubble up for retry handling
result = await asyncio.wait_for(run_evaluation(), timeout=self.config.timeout)

except Exception as e:
logger.error(f"Error in direct evaluation: {str(e)}")
# Validate result
if not isinstance(result, dict):
logger.warning(f"Evaluation returned non-dictionary result: {result}")
return {"error": 0.0}

return result

async def _cascade_evaluate(
self, program_path: str
) -> Union[Dict[str, float], EvaluationResult]:
Expand Down Expand Up @@ -299,10 +334,24 @@ async def _cascade_evaluate(
if not hasattr(module, "evaluate_stage1"):
return await self._direct_evaluate(program_path)

# Run first stage
# Run first stage with timeout
try:
stage1_result = await run_in_executor(module.evaluate_stage1)(program_path)

async def run_stage1():
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, module.evaluate_stage1, program_path)

stage1_result = await asyncio.wait_for(run_stage1(), timeout=self.config.timeout)
stage1_eval_result = self._process_evaluation_result(stage1_result)
except asyncio.TimeoutError:
logger.warning(f"Stage 1 evaluation timed out after {self.config.timeout}s")
return EvaluationResult(
metrics={"stage1_passed": 0.0, "error": 0.0, "timeout": True},
artifacts={
"failure_stage": "stage1",
"timeout": True,
},
)
except Exception as e:
logger.error(f"Error in stage 1 evaluation: {str(e)}")
# Capture stage 1 failure as artifacts
Expand All @@ -325,10 +374,27 @@ async def _cascade_evaluate(
if not hasattr(module, "evaluate_stage2"):
return stage1_eval_result

# Run second stage
# Run second stage with timeout
try:
stage2_result = await run_in_executor(module.evaluate_stage2)(program_path)

async def run_stage2():
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, module.evaluate_stage2, program_path)

stage2_result = await asyncio.wait_for(run_stage2(), timeout=self.config.timeout)
stage2_eval_result = self._process_evaluation_result(stage2_result)
except asyncio.TimeoutError:
logger.warning(f"Stage 2 evaluation timed out after {self.config.timeout}s")
# Capture stage 2 failure, but keep stage 1 results
stage1_eval_result.artifacts.update(
{
"stage2_timeout": True,
"failure_stage": "stage2",
}
)
stage1_eval_result.metrics["stage2_passed"] = 0.0
stage1_eval_result.metrics["timeout"] = True
return stage1_eval_result
except Exception as e:
logger.error(f"Error in stage 2 evaluation: {str(e)}")
# Capture stage 2 failure, but keep stage 1 results
Expand Down Expand Up @@ -370,10 +436,27 @@ async def _cascade_evaluate(
if not hasattr(module, "evaluate_stage3"):
return merged_result

# Run third stage
# Run third stage with timeout
try:
stage3_result = await run_in_executor(module.evaluate_stage3)(program_path)

async def run_stage3():
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, module.evaluate_stage3, program_path)

stage3_result = await asyncio.wait_for(run_stage3(), timeout=self.config.timeout)
stage3_eval_result = self._process_evaluation_result(stage3_result)
except asyncio.TimeoutError:
logger.warning(f"Stage 3 evaluation timed out after {self.config.timeout}s")
# Capture stage 3 failure, but keep previous results
merged_result.artifacts.update(
{
"stage3_timeout": True,
"failure_stage": "stage3",
}
)
merged_result.metrics["stage3_passed"] = 0.0
merged_result.metrics["timeout"] = True
return merged_result
except Exception as e:
logger.error(f"Error in stage 3 evaluation: {str(e)}")
# Capture stage 3 failure, but keep previous results
Expand All @@ -398,8 +481,9 @@ async def _cascade_evaluate(

except Exception as e:
logger.error(f"Error in cascade evaluation: {str(e)}")
# Return proper cascade failure result instead of re-raising
return EvaluationResult(
metrics={"error": 0.0},
metrics={"stage1_passed": 0.0, "error": 0.0},
artifacts={
"stderr": str(e),
"traceback": traceback.format_exc(),
Expand Down
64 changes: 63 additions & 1 deletion openevolve/utils/async_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,60 @@ async def wrapper(*args: Any, **kwargs: Any) -> Any:
return wrapper


async def run_with_timeout(
coro: Callable, timeout: float, *args: Any, timeout_error_value: Any = None, **kwargs: Any
) -> Any:
"""
Run a coroutine with a timeout, returning a default value on timeout

Args:
coro: Coroutine function to run
timeout: Timeout in seconds
*args: Arguments to pass to the coroutine
timeout_error_value: Value to return on timeout (default: {"error": 0.0, "timeout": True})
**kwargs: Keyword arguments to pass to the coroutine

Returns:
Result of the coroutine or timeout_error_value on timeout
"""
if timeout_error_value is None:
timeout_error_value = {"error": 0.0, "timeout": True}

try:
return await asyncio.wait_for(coro(*args, **kwargs), timeout=timeout)
except asyncio.TimeoutError:
logger.warning(f"Operation timed out after {timeout}s")
return timeout_error_value


async def run_sync_with_timeout(
func: Callable, timeout: float, *args: Any, timeout_error_value: Any = None, **kwargs: Any
) -> Any:
"""
Run a synchronous function in an executor with a timeout

Args:
func: Synchronous function to run
timeout: Timeout in seconds
*args: Arguments to pass to the function
timeout_error_value: Value to return on timeout (default: {"error": 0.0, "timeout": True})
**kwargs: Keyword arguments to pass to the function

Returns:
Result of the function or timeout_error_value on timeout
"""
if timeout_error_value is None:
timeout_error_value = {"error": 0.0, "timeout": True}

try:
loop = asyncio.get_event_loop()
task = loop.run_in_executor(None, functools.partial(func, *args, **kwargs))
return await asyncio.wait_for(task, timeout=timeout)
except asyncio.TimeoutError:
logger.warning(f"Sync operation timed out after {timeout}s")
return timeout_error_value


async def gather_with_concurrency(
n: int, *tasks: asyncio.Future, return_exceptions: bool = False
) -> List[Any]:
Expand Down Expand Up @@ -116,9 +170,17 @@ class TaskPool:
"""

def __init__(self, max_concurrency: int = 10):
self.semaphore = asyncio.Semaphore(max_concurrency)
self.max_concurrency = max_concurrency
self._semaphore: Optional[asyncio.Semaphore] = None
self.tasks: List[asyncio.Task] = []

@property
def semaphore(self) -> asyncio.Semaphore:
"""Lazy-initialize the semaphore when first needed"""
if self._semaphore is None:
self._semaphore = asyncio.Semaphore(self.max_concurrency)
return self._semaphore

async def run(self, coro: Callable, *args: Any, **kwargs: Any) -> Any:
"""
Run a coroutine in the pool
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"

[project]
name = "openevolve"
version = "0.0.1"
version = "0.0.2"
description = "Open-source implementation of AlphaEvolve"
readme = "README.md"
requires-python = ">=3.9"
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

setup(
name="openevolve",
version="0.0.1",
version="0.0.2",
packages=find_packages(),
include_package_data=True,
)
Loading