Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .idea/SwanLab.iml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions .idea/dictionaries/project.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 20 additions & 0 deletions .idea/inspectionProfiles/Project_Default.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion .idea/inspectionProfiles/profiles_settings.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion swanlab/core_python/uploader/batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ def trace_metrics(
client = get_client()
# 遍历生成器产生的每一个数据块
for chunk in _generate_chunks(data, per_request_len):
# TODO: 暂时注释掉前置检查
# TODO: 暂时注释掉前置检查,与 upload_media_metrics 一致
# 如果在发送过程中 client 变成了 pending,则中断后续发送
# if client.pending:
# break
Expand Down
51 changes: 40 additions & 11 deletions swanlab/core_python/uploader/upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
@description: 定义上传函数
"""

import inspect
from functools import wraps
from typing import List

from swanlab.log import swanlog
Expand All @@ -14,10 +16,37 @@
from ..client import get_client, safe_request, decode_response
from ...error import ApiError


def skip_if_empty(log_message: str):
"""
装饰器:检查第一个参数(列表)是否为空。
如果为空,打印 debug 日志并返回 None,不再执行原函数。
"""

def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
sig = inspect.signature(func)
# The list to check is the first parameter of the decorated function
param_name = next(iter(sig.parameters))
bound_args = sig.bind_partial(*args, **kwargs)
if param_name in bound_args.arguments:
data_list = bound_args.arguments[param_name]
if hasattr(data_list, "__len__") and len(data_list) == 0:
return swanlog.debug(log_message)

return func(*args, **kwargs)

return wrapper

return decorator


HOUSE_URL = '/house/metrics'


@safe_request
@skip_if_empty("No logs to upload.")
def upload_logs(logs: List[LogModel]):
"""
上传日志信息
Expand All @@ -27,13 +56,14 @@ def upload_logs(logs: List[LogModel]):
for log in logs:
metrics.extend([{"level": log['level'], **l} for l in log['contents']])
if len(metrics) == 0:
return swanlog.debug("No logs to upload.")
return swanlog.debug("No log metrics to upload.")
data = create_data(metrics, "log")
trace_metrics(HOUSE_URL, data)
return None


@safe_request
@skip_if_empty("No media metrics to upload.")
def upload_media_metrics(media_metrics: List[MediaModel]):
"""
上传指标的媒体数据
Expand All @@ -43,32 +73,34 @@ def upload_media_metrics(media_metrics: List[MediaModel]):
buffers = []
for media in media_metrics:
media.buffers and buffers.extend(media.buffers)
if not client.pending:
# TODO 与 trace_metrics一样,注释掉 pending 判断
# if not client.pending:
if len(buffers) > 0:
upload_to_cos(client, cuid=client.exp_id, buffers=buffers)
# 上传指标信息
trace_metrics(HOUSE_URL, create_data([x.to_dict() for x in media_metrics], MediaModel.type.value))
# 上传指标信息
trace_metrics(HOUSE_URL, create_data([x.to_dict() for x in media_metrics], MediaModel.type.value))


@safe_request
@skip_if_empty("No scalar metrics to upload.")
def upload_scalar_metrics(scalar_metrics: List[ScalarModel]):
"""
上传指标的标量数据
"""
data = create_data([x.to_dict() for x in scalar_metrics], ScalarModel.type.value)
# 上传指标信息
trace_metrics(HOUSE_URL, data)


@safe_request
@skip_if_empty("No files to upload.")
def upload_files(files: List[FileModel]):
"""
上传files文件夹中的内容
:param files: 文件列表,内部为文件信息(text/dict)
"""
http = get_client()
# 去重所有的FileModel,留下一个
if len(files) == 0:
swanlog.warning("No files to upload.")
return
file_model = FileModel.create(files)
# 如果没有文件需要上传,直接返回
if file_model.empty:
Expand All @@ -84,16 +116,13 @@ def upload_files(files: List[FileModel]):


@safe_request
@skip_if_empty("No columns to upload.")
def upload_columns(columns: List[ColumnModel]):
"""
批量上传并创建 columns,每个请求的列长度有一个最大值
"""
http = get_client()
url = f'/experiment/{http.exp_id}/columns'
# 如果列表长度为0,则跳过
if len(columns) == 0:
swanlog.debug("No columns to upload.")
return
# 分批上传
try:
trace_metrics(url, [x.to_dict() for x in columns], per_request_len=3000)
Expand Down