Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 36 additions & 27 deletions swanlab/data/porter/mounter.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
一般情况下,他与 Porter 一起使用,作为 Porter 的前置操作
"""

from typing import Optional
from typing import Optional, List

from swanlab.core_python import get_client, Client
from swanlab.data import namer as N
Expand Down Expand Up @@ -97,32 +97,41 @@ def execute(self):
# 列信息, [{error: dict or None, key:str, type:str, class:str}]
columns = columns_resp.get("list", [])
# key -> (column_type, column_class, error, latest step)
metrics: RemoteMetric = {}
for column in columns:
# 从列信息中获取指标信息
key = column["key"]
column_type = column["type"]
column_class = column["class"]
error = column.get("error", None)
if column_class == "SYSTEM" and not is_system_key(key):
# 只记录 sdk 生成的系统指标
continue
# 从总结数据中获取最新的 step
# 这里需要同时查找 media 和 scalar
latest_step = None
for scalar_summary in summaries.get("scalar") or []:
if scalar_summary["key"] == key:
latest_step = scalar_summary["step"]
break
if latest_step is None:
for media_summary in summaries.get("media") or []:
if media_summary["key"] == key:
latest_step = media_summary["step"]
break
# 极端情况下,总结数据中会不包含此key,此时 latest_step 仍为 None
# 具体情况就是列创建了,但是没有任何数据,这时候latest_step设置为-1即可,代表可接受所有step
metrics[key] = (column_type, column_class, error, latest_step or -1)
run_store.metrics = metrics
run_store.metrics = self.get_metrics(columns, summaries)

@staticmethod
def get_metrics(columns: List, summaries) -> RemoteMetric:
"""
获取指标信息
:param columns: 列信息
:param summaries: 指标总结数据
:return: key -> (column_type, column_class, error, latest step)
"""
metrics: RemoteMetric = {}
# 优化:构建快速查找字典,避免在循环中重复遍历 summaries
# 优先级:scalar > media,所以先存 media,再存 scalar(覆盖)
summary_steps = {}
for item in summaries.get("media") or []:
summary_steps[item["key"]] = item["step"]
for item in summaries.get("scalar") or []:
summary_steps[item["key"]] = item["step"]

for column in columns:
# 从列信息中获取指标信息
key = column["key"]
column_type = column["type"]
column_class = column["class"]
error = column.get("error", None)
if column_class == "SYSTEM" and not is_system_key(key):
# 只记录 sdk 生成的系统指标
continue
# 从总结数据中获取最新的 step
latest_step = summary_steps.get(key)

# 极端情况下,总结数据中会不包含此key,此时 latest_step 仍为 None
# 具体情况就是列创建了,但是没有任何数据,这时候latest_step设置为-1即可,代表可接受所有step
metrics[key] = (column_type, column_class, error, latest_step if latest_step is not None else -1)
return metrics

def __exit__(self, exc_type, exc_val, exc_tb):
self._run_store = None
138 changes: 138 additions & 0 deletions test/unit/data/porter/test_mounter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
import time

from swanlab.data.porter.mounter import Mounter


class TestMounterMetrics:
def test_get_metrics_basic(self):
"""
Test basic functionality of get_metrics with scalar and media summaries.
"""
columns = [
{"key": "loss", "type": "float", "class": "SCALAR"},
{"key": "image", "type": "image", "class": "MEDIA"},
]
summaries = {
"scalar": [{"key": "loss", "step": 10}],
"media": [{"key": "image", "step": 5}],
}

metrics = Mounter.get_metrics(columns, summaries)

assert metrics["loss"] == ("float", "SCALAR", None, 10)
assert metrics["image"] == ("image", "MEDIA", None, 5)

def test_get_metrics_priority(self):
"""
Test that scalar summaries take precedence over media summaries for the same key.
"""
columns = [
{"key": "mixed", "type": "float", "class": "SCALAR"},
]
summaries = {
"scalar": [{"key": "mixed", "step": 20}],
"media": [{"key": "mixed", "step": 15}],
}

metrics = Mounter.get_metrics(columns, summaries)

assert metrics["mixed"] == ("float", "SCALAR", None, 20)

def test_get_metrics_missing_summary(self):
"""
Test that columns with no corresponding summary get step -1.
"""
columns = [
{"key": "empty", "type": "float", "class": "SCALAR"},
]
summaries = {
"scalar": [],
"media": [],
}

metrics = Mounter.get_metrics(columns, summaries)

assert metrics["empty"] == ("float", "SCALAR", None, -1)

def test_get_metrics_system_filtering(self):
"""
Test that non-SDK system metrics are filtered out.
"""
columns = [
{"key": "system/cpu", "type": "float", "class": "SYSTEM"}, # Valid system key
{"key": "custom_system", "type": "float", "class": "SYSTEM"}, # Invalid system key
{"key": "normal", "type": "float", "class": "SCALAR"},
]
summaries = {
"scalar": [
{"key": "system/cpu", "step": 100},
{"key": "custom_system", "step": 50},
{"key": "normal", "step": 1},
],
}

# Mounter uses swanlab.data.run.metadata.hardware.is_system_key which checks a predefined list.
# Since we are running in unit tests, we rely on the actual implementation of is_system_key.
# "system/cpu" is NOT in the default system keys list usually (it's hardware specific naming like cpu_usage),
# but let's check what is_system_key actually checks.

# To avoid dependency on the exact list of system keys which might change or be empty in test env,
# we can mock is_system_key. But Mounter imports it directly.
# So we should probably check what `is_system_key` does or mock it in the test.

# Let's mock is_system_key for this test to be deterministic.
from unittest.mock import patch

with patch("swanlab.data.porter.mounter.is_system_key") as mock_is_system_key:
# Setup mock: return True for "system/cpu", False for others
mock_is_system_key.side_effect = lambda k: k == "system/cpu"

metrics = Mounter.get_metrics(columns, summaries)

assert "system/cpu" in metrics
assert "custom_system" not in metrics
assert "normal" in metrics

def test_get_metrics_error_handling(self):
"""
Test handling of columns with errors.
"""
columns = [
{"key": "broken", "type": "float", "class": "SCALAR", "error": {"msg": "failed"}},
]
summaries = {}

metrics = Mounter.get_metrics(columns, summaries)

assert metrics["broken"] == ("float", "SCALAR", {"msg": "failed"}, -1)

def test_get_metrics_large_scale(self, count=1_000_000):
"""
Test performance of get_metrics with 1 million columns.
"""

# Generate 1 million columns
columns = [{"key": f"metric_{i}", "type": "float", "class": "SCALAR"} for i in range(count)]

# Generate summaries for half of them
# 500k scalar summaries
summaries = {
"scalar": [{"key": f"metric_{i}", "step": i} for i in range(0, count, 2)],
"media": [],
}

start_time = time.time()

metrics = Mounter.get_metrics(columns, summaries)

end_time = time.time()
duration = end_time - start_time

# Verify correctness for a few items
assert metrics["metric_0"][3] == 0
assert metrics["metric_2"][3] == 2
assert metrics["metric_1"][3] == -1 # odd numbers have no summary

# Performance assertion: Should be very fast (e.g., < 2s on modern machine)
# Setting a conservative limit of 10s to account for CI variance
assert duration < 10.0, f"Performance too slow: {duration}s"