-
Notifications
You must be signed in to change notification settings - Fork 186
Feature/s3 #1375
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Feature/s3 #1375
Changes from all commits
Commits
Show all changes
7 commits
Select commit
Hold shift + click to select a range
063f02b
Refactor COS upload and experiment state APIs
SAKURA-CAT 6151bcb
Reset buffer pointer before COS upload
SAKURA-CAT 55119b7
Move buffer seek to _upload function
SAKURA-CAT 6c5900b
Add retry logic to file upload and unit tests
SAKURA-CAT 5202478
Improve error handling in COS upload function
SAKURA-CAT 1881123
Improve retry backoff in upload_file function
SAKURA-CAT 86cce6d
Remove unused import and URL prefix logic
SAKURA-CAT File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -8,4 +8,3 @@ | |
| """ | ||
|
|
||
| from .client import * | ||
| from .session import create_session | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,38 @@ | ||
| """ | ||
| @author: cunyue | ||
| @file: experiment.py | ||
| @time: 2025/12/11 18:37 | ||
| @description: 定义实验相关的后端API接口 | ||
| """ | ||
|
|
||
| from typing import Literal | ||
|
|
||
| from swanlab.core_python.client import Client | ||
|
|
||
|
|
||
| def update_experiment_state( | ||
| client: Client, | ||
| *, | ||
| username: str, | ||
| projname: str, | ||
| cuid: str, | ||
| state: Literal['FINISHED', 'CRASHED', 'ABORTED'], | ||
| finished_at: str = None, | ||
| ): | ||
| """ | ||
| 更新实验状态,注意此接口会将客户端标记为 pending 状态,表示实验已结束 | ||
| :param client: 已登录的客户端实例 | ||
| :param username: 实验所属用户名 | ||
| :param projname: 实验所属项目名称 | ||
| :param cuid: 实验唯一标识符 | ||
| :param state: 实验状态 | ||
| :param finished_at: 实验结束时间,格式为 ISO 8601,如果不提供则使用当前时间 | ||
| """ | ||
| put_data = { | ||
| "state": state, | ||
| "finishedAt": finished_at, | ||
| "from": "sdk", | ||
| } | ||
| put_data = {k: v for k, v in put_data.items() if v is not None} # 移除值为None的键 | ||
| client.put(f"/project/{username}/{projname}/runs/{cuid}/state", put_data) | ||
| client.pending = True | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,92 @@ | ||
| """ | ||
| @author: cunyue | ||
| @file: service.py | ||
| @time: 2025/12/11 18:48 | ||
| @description: 服务相关API接口 | ||
| """ | ||
|
|
||
| import time | ||
| from concurrent.futures import ThreadPoolExecutor | ||
| from io import BytesIO | ||
| from typing import List, Tuple | ||
|
|
||
| import requests | ||
| from requests.exceptions import RequestException | ||
|
|
||
| from ..client import Client | ||
| from ...log import swanlog | ||
| from ...toolkit.models.data import MediaBuffer | ||
|
|
||
|
|
||
| def upload_file(*, url: str, buffer: BytesIO, max_retries=3): | ||
| """ | ||
| 上传文件到COS | ||
| :param url: COS上传URL | ||
| :param buffer: 文件内容的BytesIO对象 | ||
SAKURA-CAT marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| :param max_retries: 最大重试次数 | ||
| """ | ||
| # 这里也可以创建一个 Session 对象复用 TCP 连接 | ||
| with requests.Session() as session: | ||
| for attempt in range(1, max_retries + 1): | ||
| try: | ||
| buffer.seek(0) | ||
| response = session.put( | ||
| url, | ||
| data=buffer, | ||
| headers={'Content-Type': 'application/octet-stream'}, | ||
| timeout=30, | ||
| ) | ||
| response.raise_for_status() | ||
| return | ||
| except RequestException: | ||
| swanlog.warning("Upload attempt {} failed for URL: {}".format(attempt, url)) | ||
| # 如果是最后一次尝试,抛出异常 | ||
| if attempt == max_retries: | ||
| raise | ||
| # 简单的指数退避(等待 1s, 2s, 4s...) | ||
| time.sleep(2 ** (attempt - 1)) | ||
|
|
||
|
|
||
| def upload_to_cos(client: Client, *, cuid: str, buffers: List[MediaBuffer]): | ||
| """ | ||
| 上传文件到COS | ||
| :param client: 对应的客户端实例 | ||
| :param cuid: 实验cuid | ||
| :param buffers: 媒体数据缓冲区 | ||
| """ | ||
| failed_buffers: List[Tuple[str, MediaBuffer]] = [] | ||
| # 1. 后端签名 | ||
| data, _ = client.post( | ||
| '/resources/presigned/put', | ||
| {"experimentId": cuid, "paths": [buffer.file_name for buffer in buffers]}, | ||
| ) | ||
| urls: List[str] = data['urls'] | ||
SAKURA-CAT marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| # 2. 并发上传 | ||
| # executor.submit可能会失败,因为线程数有限或者线程池已经关闭 | ||
| # 来自此issue: https://github.com/SwanHubX/SwanLab/issues/889,此时需要一个个发送 | ||
| with ThreadPoolExecutor(max_workers=10) as executor: | ||
| futures = [] | ||
| assert len(urls) == len(buffers), "URLs and buffers length mismatch" | ||
| # 2.1 在线程中并发上传 | ||
| for index, buffer in enumerate(buffers): | ||
| url = urls[index] | ||
| try: | ||
| future = executor.submit(upload_file, url=url, buffer=buffer) | ||
| futures.append((future, url, buffer)) | ||
| except RuntimeError: | ||
| failed_buffers.append((url, buffer)) | ||
| # 2.2 收集结果 | ||
| for future, url, buffer in futures: | ||
| try: | ||
| future.result() | ||
| except Exception as e: | ||
| swanlog.warning(f"Failed to upload {url}: {e}, will retry...") | ||
| failed_buffers.append((url, buffer)) | ||
| # 3. 重试失败的buffer,重新上传 | ||
| if len(failed_buffers): | ||
| swanlog.debug("Retrying failed buffers: {}".format(len(failed_buffers))) | ||
| for url, buffer in failed_buffers: | ||
| try: | ||
| upload_file(url=url, buffer=buffer) | ||
| except Exception as e: | ||
| swanlog.error(f"Failed to upload {url}: {e}") | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.