Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -132,3 +132,4 @@ repos:
entry: python pre_commit_hooks/check_copyright_header.py
language: python
verbose: true
exclude: '\.license$'
271 changes: 155 additions & 116 deletions README.md

Large diffs are not rendered by default.

24 changes: 21 additions & 3 deletions pre_commit_hooks/check_copyright_header.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,27 @@ def check_files_have_updated_header(self, filenames: list) -> bool:
if not os.path.exists(filename):
continue

with open(filename, encoding="utf-8") as file:
first_line = file.readline()
second_line = file.readline()
# For JSON files, check for sidecar .license file
if filename.endswith(".json"):
license_file = filename + ".license"
if os.path.exists(license_file):
filename = license_file
else:
print(
f"ERROR: JSON file {filename} requires a sidecar "
f"{license_file} file with copyright header!"
)
has_outdated_headers = True
continue

try:
with open(filename, encoding="utf-8") as file:
first_line = file.readline()
second_line = file.readline()
except (UnicodeDecodeError, PermissionError) as err:
# Skip binary files and files without read permissions
print(f"WARN: Cannot check {filename}: {err}! ")
continue

# Handle Markdown vs others
header_line = second_line if filename.endswith(".md") else first_line
Expand Down
1 change: 1 addition & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ where = src
console_scripts =
mlia=mlia.cli.main:main
mlia-backend=mlia.cli.main:backend_main
mlia-target=mlia.cli.main:target_main

[flake8]
# ignored errors
Expand Down
171 changes: 171 additions & 0 deletions src/mlia/backend/corstone/performance.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,14 @@
import subprocess # nosec
from dataclasses import dataclass
from pathlib import Path
from typing import Any

import mlia
import mlia.core.output_schema as schema
from mlia.backend.errors import BackendExecutionFailed
from mlia.backend.repo import get_backend_repository
from mlia.utils.filesystem import get_mlia_resources
from mlia.utils.filesystem import sha256
from mlia.utils.proc import Command
from mlia.utils.proc import OutputLogger
from mlia.utils.proc import process_command_output
Expand Down Expand Up @@ -54,6 +58,173 @@ class PerformanceMetrics:
npu_axi1_rd_data_beat_received: int
npu_axi1_wr_data_beat_written: int | None = None

def to_standardized_output( # pylint: disable=too-many-locals
self,
model_path: Path,
backend_name: str,
target_config: dict[str, Any],
run_id: str | None = None,
timestamp: str | None = None,
cli_arguments: list[str] | None = None,
backend_config: dict[str, Any] | None = None,
) -> Any: # Returns StandardizedOutput but avoid circular import
"""Convert to standardized output format.

Args:
model_path: Path to the model file
backend_name: Name of the Corstone backend (e.g., "corstone-300")
target_config: Target configuration parameters
run_id: Optional run ID (will be generated if not provided)
timestamp: Optional ISO 8601 timestamp (will be generated if not provided)
cli_arguments: Optional CLI arguments used for the run
backend_config: Optional backend configuration parameters

Returns:
StandardizedOutput object with performance results
"""
# Generate run_id and timestamp if not provided
if run_id is None:
run_id = schema.StandardizedOutput.create_run_id()
if timestamp is None:
timestamp = schema.StandardizedOutput.create_timestamp()

# Create tool info
tool = schema.Tool(name="mlia", version=mlia.__version__)

# Create backend
backend = schema.Backend(
id=backend_name,
name=f"Corstone {backend_name.split('-')[1]}"
if "-" in backend_name
else backend_name,
version="unknown", # Corstone version comes from FVP executable
configuration=backend_config or {},
)

# Extract target info from config
target_type = target_config.get("target_type", backend_name)
profile_name = target_config.get("profile_name", target_type)
mac_config = target_config.get("mac", "unknown")

# Create target components - only NPU
components = []

# Add NPU component if target info available
npu_target = target_config.get("npu_target") or target_config.get("target")
if npu_target:
npu_family = npu_target.split("-")[0] if "-" in npu_target else "ethos-u"
npu_model = npu_target.split("-")[1] if "-" in npu_target else None

components.append(
schema.Component(
type=schema.ComponentType.NPU,
family=npu_family,
model=npu_model,
variant=str(mac_config) if mac_config != "unknown" else None,
)
)

target = schema.Target(
profile_name=profile_name,
target_type=target_type,
components=components,
configuration=target_config,
description=f"Corstone {backend_name} FVP simulation",
)

# Create model info
model_hash = sha256(model_path)
model_size = model_path.stat().st_size if model_path.exists() else None
suffix = model_path.suffix.lower()
if suffix == ".tflite":
model_format = "tflite"
elif suffix in [".vela", ".tflite.vela"]:
model_format = "vela"
else:
model_format = suffix.lstrip(".") or "unknown"

model = schema.Model(
name=model_path.name,
format=model_format,
hash=model_hash,
size_bytes=model_size,
)

# Create context
context = schema.Context(
cli_arguments=cli_arguments or [],
runtime_configuration=None,
git=None,
notes=None,
)

# Create performance metrics
metrics = [
schema.Metric(
name="npu_active_cycles",
value=float(self.npu_active_cycles),
unit="cycles",
),
schema.Metric(
name="npu_idle_cycles",
value=float(self.npu_idle_cycles),
unit="cycles",
),
schema.Metric(
name="npu_total_cycles",
value=float(self.npu_total_cycles),
unit="cycles",
),
schema.Metric(
name="npu_axi0_rd_data_beat_received",
value=float(self.npu_axi0_rd_data_beat_received),
unit="beats",
),
schema.Metric(
name="npu_axi0_wr_data_beat_written",
value=float(self.npu_axi0_wr_data_beat_written),
unit="beats",
),
schema.Metric(
name="npu_axi1_rd_data_beat_received",
value=float(self.npu_axi1_rd_data_beat_received),
unit="beats",
),
]

if self.npu_axi1_wr_data_beat_written is not None:
metrics.append(
schema.Metric(
name="npu_axi1_wr_data_beat_written",
value=float(self.npu_axi1_wr_data_beat_written),
unit="beats",
)
)

# Create result
result = schema.Result(
kind=schema.ResultKind.PERFORMANCE,
status=schema.ResultStatus.OK,
producer=backend.id,
warnings=[],
errors=[],
metrics=metrics,
mode=schema.ModeType.SIMULATED, # Corstone is simulation
)

return schema.StandardizedOutput(
schema_version=schema.SCHEMA_VERSION,
run_id=run_id,
timestamp=timestamp,
tool=tool,
target=target,
model=model,
context=context,
backends=[backend],
results=[result],
extensions={},
).to_dict()


class GenericInferenceOutputParser:
"""Generic inference runner output parser."""
Expand Down
Loading