Skip to content

Conversation

@Xuwznln
Copy link
Collaborator

@Xuwznln Xuwznln commented Oct 16, 2025

Adapt uuid resource manager to RegularContainer, WorkstationNode.
Fix doc build.

Summary by Sourcery

Adapt resource UUID management across core components and workstation nodes, enhance Bioyond workstation drivers with deck support and a full-featured workflow orchestration API, and fix documentation CI and examples to build reliably under a conda/Mamba environment.

New Features:

  • Add deck parameter support in BioyondReactionStation and BioyondDispensingStation constructors
  • Introduce comprehensive workflow orchestration API in BioyondReactionStation (sequence querying, merging, processing and task creation)
  • Subclass RegularContainer from pylabrobot Container with default dimensions and load_state capability

Bug Fixes:

  • Fix children mapping and UUID propagation in ResourceDictInstance and ResourceTracker
  • Correct GraphIO canonicalization for node name handling and deprecated sample_id

Enhancements:

  • Propagate device_uuid through BaseDeviceNode and ROS initialization to ensure consistent node and resource UUIDs
  • Refactor HostNode resource management to build and synchronize resource trees with UUID mappings and async callbacks
  • Improve append_to_workflow_sequence to parse JSON names and return success status
  • Streamline experiment example to use updated BioyondReactionStation API

CI:

  • Update GitHub Actions deploy-docs workflow to use Miniforge/Mamba environment and run Sphinx build under conda
  • Adjust conda-pack-build workflow to include source repository and .git history

Documentation:

  • Fix Sphinx configuration and organic_synthesis.md example command
  • Add log_level configuration in BasicConfig and adjust documentation dependencies installation

Xuwznln and others added 30 commits October 16, 2025 14:32
Refactored the Bioyond workstation classes to improve parameter handling and workflow management. Updated experiment.py to use BioyondReactionStation with deck and material mappings, and enhanced workflow step parameter mapping and execution logic. Adjusted JSON experiment configs, improved workflow sequence handling, and added UUID assignment to PLR materials. Removed unused station_config and material cache logic, and added detailed docstrings and debug output for workflow methods.
Fix resource parent not found.
Mapping uuid for all resources.
@sourcery-ai
Copy link

sourcery-ai bot commented Oct 16, 2025

Reviewer's Guide

This PR overhauls resource UUID management across node drivers and containers, refactors the Bioyond reaction station workflow pipeline to support deck injection and end-to-end task execution, and fixes documentation build jobs by migrating to a Conda/Mamba-based setup.

Sequence diagram for resource UUID mapping and update in HostNode initialization

sequenceDiagram
    participant HostNode
    participant ResourceDictInstance
    participant ResourceTreeInstance
    participant HTTPClient
    participant ResourceTracker

    HostNode->>ResourceDictInstance: get_resource_instance_from_dict(host_node_dict)
    HostNode->>ResourceTreeInstance: create ResourceTreeInstance(host_node_instance)
    HostNode->>HTTPClient: resource_tree_add(resources_config, "", True)
    HTTPClient-->>HostNode: returns uuid_mapping
    HostNode->>ResourceTracker: loop_update_uuid(resource_instance, uuid_mapping)
Loading

Sequence diagram for BioyondReactionStation workflow execution

sequenceDiagram
    participant User
    participant BioyondReactionStation
    participant HardwareInterface

    User->>BioyondReactionStation: process_and_execute_workflow(workflow_name, task_name)
    BioyondReactionStation->>BioyondReactionStation: get_workflow_sequence()
    BioyondReactionStation->>BioyondReactionStation: process_web_workflows(web_workflow_json)
    BioyondReactionStation->>BioyondReactionStation: merge_sequence_workflow(merge_json)
    BioyondReactionStation->>HardwareInterface: workflow_step_query(workflow_query_json)
    BioyondReactionStation->>BioyondReactionStation: generate_task_param_values(workflow_params_structure)
    BioyondReactionStation->>HardwareInterface: create_order(task_json)
    BioyondReactionStation-->>User: returns result
Loading

Class diagram for updated RegularContainer and BioyondReactionStation

classDiagram
    class RegularContainer {
        +Dict kwargs
        +Dict state
        +load_state(state: Dict)
    }
    RegularContainer --|> Container

    class BioyondReactionStation {
        +__init__(config: dict = None, deck=None)
        +reactor_taken_out()
        +reactor_taken_in(assign_material_name, cutoff, temperature)
        +solid_feeding_vials(material_id, time, torque_variation, assign_material_name, temperature)
        +liquid_feeding_vials_non_titration(volumeFormula, assign_material_name, titration_type, time, torque_variation, temperature)
        +liquid_feeding_solvents(assign_material_name, volume, titration_type, time, torque_variation, temperature)
        +liquid_feeding_titration(volume_formula, assign_material_name, titration_type, time, torque_variation, temperature)
        +liquid_feeding_beaker(volume, assign_material_name, time, torque_variation, titrationType, temperature)
        +get_workflow_sequence()
        +workflow_step_query(workflow_id)
        +create_order(json_str)
        +process_and_execute_workflow(workflow_name, task_name)
        +merge_sequence_workflow(json_str)
        +generate_task_param_values(workflow_params_structure)
    }
    BioyondReactionStation --|> BioyondWorkstation
Loading

Class diagram for resource UUID management in ResourceDictInstance and ResourceTreeSet

classDiagram
    class ResourceDictInstance {
        +get_resource_instance_from_dict(content: Dict)
        +get_nested_dict()
        +children
        +res_content
    }
    class ResourceTreeSet {
        +trees
        +set_resource_uuid(resource, new_uuid)
        +loop_update_uuid(resource, uuid_map)
        +add_resource(resource)
        +to_plr_resources()
        +from_raw_list(tree_data)
    }
    ResourceTreeSet "1" o-- "*" ResourceDictInstance
Loading

File-Level Changes

Change Details Files
Overhaul BioyondReactionStation workflow pipeline
  • Introduce optional deck parameter and related debug logs in init
  • Unify append_to_workflow_sequence to parse JSON names and call pending_task_params directly
  • Add get_workflow_sequence, workflow_step_query and create_order interfaces
  • Implement process_and_execute_workflow, merge_sequence_workflow and generate_task_param_values to build and execute tasks
unilabos/devices/workstation/bioyond_studio/reaction_station.py
Propagate device_uuid and refactor resource tracking
  • Add device_uuid parameter to init_wrapper and BaseROS2DeviceNode constructors
  • Set node.uuid from passed device_uuid instead of random generation
  • Implement set_resource_uuid classmethod and update loop_update_uuid, _collect_uuid_mapping, remove duplicate resources
  • Convert resource_tree callbacks to async and enhance logging
unilabos/ros/nodes/base_device_node.py
unilabos/ros/nodes/resource_tracker.py
unilabos/ros/initialize_device.py
unilabos/ros/utils/driver_creator.py
Enhance HostNode resource tree upload with UUID mapping
  • Construct and insert a host_node ResourceTree at startup
  • Use HTTPClient.resource_tree_add and resource_edge_add with timing logs and uuid remapping
  • Wrap resource add logic in try/except and pass device_uuid into BaseROS2DeviceNode init
unilabos/ros/nodes/presets/host_node.py
Refactor RegularContainer and graphio resource handling
  • Make RegularContainer subclass pylabrobot.Container, initialize size defaults and add load_state
  • Deprecate old class via commented block
  • Warn on deprecated sample_id, enrich children keys and update type/category mapping in graphio
  • Generate missing unilabos_uuid for PLR resources
unilabos/resources/container.py
unilabos/resources/graphio.py
Migrate docs build and packaging to Conda/Mamba
  • Switch setup-python to setup-miniconda and install via mamba
  • Use mamba run for pip operations and Sphinx build
  • Capture request/response JSON files in HTTP client resource_tree_add
  • Adjust docs/conf.py theme list
.github/workflows/deploy-docs.yml
.github/workflows/conda-pack-build.yml
unilabos/app/web/client.py
docs/conf.py
Misc workstation and experiment script refactors
  • Remove unused station_config from station init and adjust append_to_workflow_sequence logic
  • Add deck argument to dispensing station constructor
  • Update experiment.py to use BioyondReactionStation with deck and hardware_interface access
  • Minor formatting and type hinting fixes
unilabos/devices/workstation/bioyond_studio/station.py
unilabos/devices/workstation/bioyond_studio/dispensing_station.py
unilabos/devices/workstation/bioyond_studio/experiment.py

Tips and commands

Interacting with Sourcery

  • Trigger a new review: Comment @sourcery-ai review on the pull request.
  • Continue discussions: Reply directly to Sourcery's review comments.
  • Generate a GitHub issue from a review comment: Ask Sourcery to create an
    issue from a review comment by replying to it. You can also reply to a
    review comment with @sourcery-ai issue to create an issue from it.
  • Generate a pull request title: Write @sourcery-ai anywhere in the pull
    request title to generate a title at any time. You can also comment
    @sourcery-ai title on the pull request to (re-)generate the title at any time.
  • Generate a pull request summary: Write @sourcery-ai summary anywhere in
    the pull request body to generate a PR summary at any time exactly where you
    want it. You can also comment @sourcery-ai summary on the pull request to
    (re-)generate the summary at any time.
  • Generate reviewer's guide: Comment @sourcery-ai guide on the pull
    request to (re-)generate the reviewer's guide at any time.
  • Resolve all Sourcery comments: Comment @sourcery-ai resolve on the
    pull request to resolve all Sourcery comments. Useful if you've already
    addressed all the comments and don't want to see them anymore.
  • Dismiss all Sourcery reviews: Comment @sourcery-ai dismiss on the pull
    request to dismiss all existing Sourcery reviews. Especially useful if you
    want to start fresh with a new review - don't forget to comment
    @sourcery-ai review to trigger a new review!

Customizing Your Experience

Access your dashboard to:

  • Enable or disable review features such as the Sourcery-generated pull request
    summary, the reviewer's guide, and others.
  • Change the review language.
  • Add, remove or edit custom review instructions.
  • Adjust other review settings.

Getting Help

@Xuwznln Xuwznln merged commit a5e31a1 into main Oct 16, 2025
9 checks passed
Copy link

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey there - I've reviewed your changes - here's some feedback:

Blocking issues:

  • An action sourced from a third-party repository on GitHub is not pinned to a full length commit SHA. Pinning an action to a full length commit SHA is currently the only way to use an action as an immutable release. Pinning to a particular SHA helps mitigate the risk of a bad actor adding a backdoor to the action's repository, as they would need to generate a SHA-1 collision for a valid Git object payload. (link)

General comments:

  • Replace the many print(...) debug statements with structured logger calls at appropriate levels and remove leftover commented‐out or unused code to keep the codebase clean.
  • The process_and_execute_workflow and generate_task_param_values methods have grown very large and complex—consider refactoring them into smaller, focused helper functions for readability and easier testing.
  • There’s duplicated and commented legacy logic around resource tree initialization (especially in HostNode and RegularContainer)—clean up these blocks to avoid confusion and consolidate to a single clear implementation.
Prompt for AI Agents
Please address the comments from this code review:

## Overall Comments
- Replace the many `print(...)` debug statements with structured `logger` calls at appropriate levels and remove leftover commented‐out or unused code to keep the codebase clean.
- The `process_and_execute_workflow` and `generate_task_param_values` methods have grown very large and complex—consider refactoring them into smaller, focused helper functions for readability and easier testing.
- There’s duplicated and commented legacy logic around resource tree initialization (especially in HostNode and RegularContainer)—clean up these blocks to avoid confusion and consolidate to a single clear implementation.

## Individual Comments

### Comment 1
<location> `unilabos/resources/graphio.py:55-57` </location>
<code_context>
         if not node.get("type"):
             node["type"] = "device"
             print_status(f"Warning: Node {node.get('id', 'unknown')} missing 'type', defaulting to 'device'", "warning")
-        if not node.get("name"):
+        if node.get("name", None) is None:
             node["name"] = node.get("id")
             print_status(f"Warning: Node {node.get('id', 'unknown')} missing 'name', defaulting to {node['name']}", "warning")
</code_context>

<issue_to_address>
**nitpick:** Changing name existence check to explicit None comparison may allow empty string names.

Empty strings for 'name' will now bypass the defaulting logic. Please confirm if this change is intentional.
</issue_to_address>

### Comment 2
<location> `unilabos/resources/graphio.py:73-74` </location>
<code_context>
             z = node.pop("z", None)
             if z is not None:
                 node["position"]["position"]["z"] = z
+        if "sample_id" in node:
+            sample_id = node.pop("sample_id")
+            if sample_id:
</code_context>

<issue_to_address>
**suggestion:** Logging error for deprecated sample_id may be too severe for legacy data.

Consider lowering the log level to warning or info for legacy sample_id values unless this is a critical error.

```suggestion
            if sample_id:
                logger.warning(f"{node}的sample_id参数已弃用,sample_id: {sample_id}")
```
</issue_to_address>

### Comment 3
<location> `unilabos/app/main.py:216` </location>
<code_context>
     print_status(f"当前工作目录为 {working_dir}", "info")
     load_config_from_file(config_path)
+
+    # 根据配置重新设置日志级别
+    from unilabos.utils.log import configure_logger, logger
+
</code_context>

<issue_to_address>
**suggestion:** Dynamically setting log level from config may cause unexpected verbosity changes.

Validate the log level from the config and set a reasonable default to avoid issues with logging verbosity.
</issue_to_address>

### Comment 4
<location> `.github/workflows/deploy-docs.yml:45` </location>
<code_context>
        uses: conda-incubator/setup-miniconda@v3
</code_context>

<issue_to_address>
**security (yaml.github-actions.security.third-party-action-not-pinned-to-commit-sha):** An action sourced from a third-party repository on GitHub is not pinned to a full length commit SHA. Pinning an action to a full length commit SHA is currently the only way to use an action as an immutable release. Pinning to a particular SHA helps mitigate the risk of a bad actor adding a backdoor to the action's repository, as they would need to generate a SHA-1 collision for a valid Git object payload.

*Source: opengrep*
</issue_to_address>

### Comment 5
<location> `unilabos/ros/nodes/base_device_node.py:960-962` </location>
<code_context>
                            final_resources = self.resource_tracker.figure_resource({"name": final_resources.name}, try_mode=False) if not is_sequence else [
                                self.resource_tracker.figure_resource({"name": res.name}, try_mode=False) for res in queried_resources
                            ]

</code_context>

<issue_to_address>
**suggestion (code-quality):** Swap if/else branches of if expression to remove negation ([`swap-if-expression`](https://docs.sourcery.ai/Reference/Rules-and-In-Line-Suggestions/Python/Default-Rules/swap-if-expression))

```suggestion
                            final_resources = [
                                                                              self.resource_tracker.figure_resource({"name": res.name}, try_mode=False) for res in queried_resources
                                                                          ] if is_sequence else self.resource_tracker.figure_resource({"name": final_resources.name}, try_mode=False)

```

<br/><details><summary>Explanation</summary>Negated conditions are more difficult to read than positive ones, so it is best
to avoid them where we can. By swapping the `if` and `else` conditions around we
can invert the condition and make it positive.
</details>
</issue_to_address>

### Comment 6
<location> `unilabos/devices/workstation/bioyond_studio/reaction_station.py:331-334` </location>
<code_context>
    def get_workflow_sequence(self) -> List[str]:
        """获取当前工作流执行顺序

        Returns:
            工作流名称列表
        """
        id_to_name = {workflow_id: name for name, workflow_id in self.workflow_mappings.items()}
        workflow_names = []
        for workflow_id in self.workflow_sequence:
            workflow_names.append(id_to_name.get(workflow_id, workflow_id))
        return workflow_names

</code_context>

<issue_to_address>
**suggestion (code-quality):** We've found these issues:

- Convert for loop into list comprehension ([`list-comprehension`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/list-comprehension/))
- Inline variable that is immediately returned ([`inline-immediately-returned-variable`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/inline-immediately-returned-variable/))

```suggestion
        return [
            id_to_name.get(workflow_id, workflow_id)
            for workflow_id in self.workflow_sequence
        ]
```
</issue_to_address>

### Comment 7
<location> `unilabos/devices/workstation/bioyond_studio/reaction_station.py:374` </location>
<code_context>
    def process_and_execute_workflow(self, workflow_name: str, task_name: str) -> dict:
        web_workflow_list = self.get_workflow_sequence()
        workflow_name = workflow_name

        pending_params_backup = self.pending_task_params.copy()
        print(f"保存pending_task_params副本,共{len(pending_params_backup)}个参数")

        # 1. 处理网页工作流列表
        print(f"处理网页工作流列表: {web_workflow_list}")
        web_workflow_json = json.dumps({"web_workflow_list": web_workflow_list})
        workflows_result = self.process_web_workflows(web_workflow_json)

        if not workflows_result:
            error_msg = "处理网页工作流列表失败"
            print(error_msg)
            result = str({"success": False, "error": f"process_and_execute_workflow:{error_msg}", "method": "process_and_execute_workflow", "step": "process_web_workflows"})
            return result

        # 2. 合并工作流序列
        print(f"合并工作流序列,名称: {workflow_name}")
        merge_json = json.dumps({"name": workflow_name})
        merged_workflow = self.merge_sequence_workflow(merge_json)
        print(f"合并工作流序列结果: {merged_workflow}")

        if not merged_workflow:
            error_msg = "合并工作流序列失败"
            print(error_msg)
            result = str({"success": False, "error": f"process_and_execute_workflow:{error_msg}", "method": "process_and_execute_workflow", "step": "merge_sequence_workflow"})
            return result

        # 3. 合并所有参数并创建任务
        # 新API只返回状态信息,需要适配处理
        if isinstance(merged_workflow, dict) and merged_workflow.get("code") == 1:
            # 新API返回格式:{code: 1, message: "", timestamp: 0}
            # 使用传入的工作流名称和生成的临时ID
            final_workflow_name = workflow_name
            workflow_id = f"merged_{workflow_name}_{self.hardware_interface.get_current_time_iso8601().replace('-', '').replace('T', '').replace(':', '').replace('.', '')[:14]}"
            print(f"新API合并成功,使用工作流创建任务: {final_workflow_name} (临时ID: {workflow_id})")
        else:
            # 旧API返回格式:包含详细工作流信息
            final_workflow_name = merged_workflow.get("name", workflow_name)
            workflow_id = merged_workflow.get("subWorkflows", [{}])[0].get("id", "")
            print(f"旧API格式,使用工作流创建任务: {final_workflow_name} (ID: {workflow_id})")

        if not workflow_id:
            error_msg = "无法获取工作流ID"
            print(error_msg)
            result = str({"success": False, "error": f"process_and_execute_workflow:{error_msg}", "method": "process_and_execute_workflow", "step": "get_workflow_id"})
            return result

        workflow_query_json = json.dumps({"workflow_id": workflow_id})
        workflow_params_structure = self.workflow_step_query(workflow_query_json)

        self.pending_task_params = pending_params_backup
        print(f"恢复pending_task_params,共{len(self.pending_task_params)}个参数")

        param_values = self.generate_task_param_values(workflow_params_structure)

        task_params = [{
            "orderCode": f"BSO{self.hardware_interface.get_current_time_iso8601().replace('-', '').replace('T', '').replace(':', '').replace('.', '')[:14]}",
            "orderName": f"实验-{self.hardware_interface.get_current_time_iso8601()[:10].replace('-', '')}",
            "workFlowId": workflow_id,
            "borderNumber": 1,
            "paramValues": param_values,
            "extendProperties": ""
        }]

        task_json = json.dumps(task_params)
        print(f"创建任务参数: {type(task_json)}")
        result = self.create_order(task_json)

        if not result:
            error_msg = "创建任务失败"
            print(error_msg)
            result = str({"success": False, "error": f"process_and_execute_workflow:{error_msg}", "method": "process_and_execute_workflow", "step": "create_order"})
            return result

        print(f"任务创建成功: {result}")
        self.pending_task_params.clear()
        print("已清空pending_task_params")

        return {
            "success": True,
            "workflow": {"name": final_workflow_name, "id": workflow_id},
            "task": result,
            "method": "process_and_execute_workflow"
        }

</code_context>

<issue_to_address>
**issue (code-quality):** We've found these issues:

- Inline variable that is immediately returned [×3] ([`inline-immediately-returned-variable`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/inline-immediately-returned-variable/))
- Extract duplicate code into method ([`extract-duplicate-method`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/extract-duplicate-method/))
</issue_to_address>

### Comment 8
<location> `unilabos/devices/workstation/bioyond_studio/reaction_station.py:490-491` </location>
<code_context>
    def merge_sequence_workflow(self, json_str: str) -> dict:
        """合并当前工作流序列

        Args:
            json_str: 包含name等参数的JSON字符串

        Returns:
            合并结果
        """
        try:
            data = json.loads(json_str)
            name = data.get("name", "合并工作流")
            step_parameters = data.get("stepParameters", {})
            variables = data.get("variables", {})
        except json.JSONDecodeError:
            return {}

        if not self.workflow_sequence:
            print("工作流序列为空,无法合并")
            return {}

        # 将工作流ID列表转换为新API要求的格式
        workflows = [{"id": workflow_id} for workflow_id in self.workflow_sequence]

        # 构建新的API参数格式
        params = {
            "name": name,
            "workflows": workflows,
            "stepParameters": step_parameters,
            "variables": variables
        }

        # 使用新的API接口
        response = self.hardware_interface.post(
            url=f'{self.hardware_interface.host}/api/lims/workflow/merge-workflow-with-parameters',
            params={
                "apiKey": self.hardware_interface.api_key,
                "requestTime": self.hardware_interface.get_current_time_iso8601(),
                "data": params,
            })

        if not response or response['code'] != 1:
            return {}
        return response.get("data", {})

</code_context>

<issue_to_address>
**issue (code-quality):** We've found these issues:

- Lift code into else after jump in control flow ([`reintroduce-else`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/reintroduce-else/))
- Swap if/else branches ([`swap-if-else-branches`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/swap-if-else-branches/))
- Replace if statement with if expression ([`assign-if-exp`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/assign-if-exp/))
</issue_to_address>

### Comment 9
<location> `unilabos/devices/workstation/bioyond_studio/reaction_station.py:494` </location>
<code_context>
    def generate_task_param_values(self, workflow_params_structure: dict) -> dict:
        """生成任务参数值

        根据工作流参数结构和待处理的任务参数,生成最终的任务参数值

        Args:
            workflow_params_structure: 工作流参数结构

        Returns:
            任务参数值字典
        """
        if not workflow_params_structure:
            print("workflow_params_structure为空")
            return {}

        data = workflow_params_structure

        # 从pending_task_params中提取实际参数值,按DisplaySectionName和Key组织
        pending_params_by_section = {}
        print(f"开始处理pending_task_params,共{len(self.pending_task_params)}个任务参数组")

        # 获取工作流执行顺序,用于按顺序匹配参数
        workflow_sequence = self.get_workflow_sequence()
        print(f"工作流执行顺序: {workflow_sequence}")

        workflow_index = 0

        # 遍历所有待处理的任务参数
        for i, task_param in enumerate(self.pending_task_params):
            if 'param_values' in task_param:
                print(f"处理第{i+1}个任务参数组,包含{len(task_param['param_values'])}个步骤")

                if workflow_index < len(workflow_sequence):
                    current_workflow = workflow_sequence[workflow_index]
                    section_name = WORKFLOW_TO_SECTION_MAP.get(current_workflow)
                    print(f"  匹配到工作流: {current_workflow} -> {section_name}")
                    workflow_index += 1
                else:
                    print(f"  警告: 参数组{i+1}超出了工作流序列范围")
                    continue

                if not section_name:
                    print(f"  警告: 工作流{current_workflow}没有对应的DisplaySectionName")
                    continue

                if section_name not in pending_params_by_section:
                    pending_params_by_section[section_name] = {}

                # 处理每个步骤的参数
                for step_id, param_list in task_param['param_values'].items():
                    print(f"    步骤ID: {step_id},参数数量: {len(param_list)}")

                    for param_item in param_list:
                        key = param_item.get('Key', '')
                        value = param_item.get('Value', '')
                        m = param_item.get('m', 0)
                        n = param_item.get('n', 0)
                        print(f"    参数: {key} = {value} (m={m}, n={n}) -> 分组到{section_name}")

                        param_key = f"{section_name}.{key}"
                        if param_key not in pending_params_by_section[section_name]:
                            pending_params_by_section[section_name][param_key] = []

                        pending_params_by_section[section_name][param_key].append({
                            'value': value,
                            'm': m,
                            'n': n
                        })

        print(f"pending_params_by_section构建完成,包含{len(pending_params_by_section)}个分组")

        # 收集所有参数,过滤TaskDisplayable为0的项
        filtered_params = []

        for step_id, step_info in data.items():
            if isinstance(step_info, list):
                for step_item in step_info:
                    param_list = step_item.get("parameterList", [])
                    for param in param_list:
                        if param.get("TaskDisplayable") == 0:
                            continue

                        param_with_step = param.copy()
                        param_with_step['step_id'] = step_id
                        param_with_step['step_name'] = step_item.get("name", "")
                        param_with_step['step_m'] = step_item.get("m", 0)
                        param_with_step['step_n'] = step_item.get("n", 0)
                        filtered_params.append(param_with_step)

        # 按DisplaySectionIndex排序
        filtered_params.sort(key=lambda x: x.get('DisplaySectionIndex', 0))

        # 生成参数映射
        param_mapping = {}
        step_params = {}
        for param in filtered_params:
            step_id = param['step_id']
            if step_id not in step_params:
                step_params[step_id] = []
            step_params[step_id].append(param)

        # 为每个步骤生成参数
        for step_id, params in step_params.items():
            param_list = []
            for param in params:
                key = param.get('Key', '')
                display_section_index = param.get('DisplaySectionIndex', 0)
                step_m = param.get('step_m', 0)
                step_n = param.get('step_n', 0)

                section_name = param.get('DisplaySectionName', '')
                param_key = f"{section_name}.{key}"

                if section_name in pending_params_by_section and param_key in pending_params_by_section[section_name]:
                    pending_param_list = pending_params_by_section[section_name][param_key]
                    if pending_param_list:
                        pending_param = pending_param_list[0]
                        value = pending_param['value']
                        m = step_m
                        n = step_n
                        print(f"      匹配成功: {section_name}.{key} = {value} (m={m}, n={n})")
                        pending_param_list.pop(0)
                    else:
                        value = "1"
                        m = step_m
                        n = step_n
                        print(f"      匹配失败: {section_name}.{key},参数列表为空,使用默认值 = {value}")
                else:
                    value = "1"
                    m = display_section_index
                    n = step_n
                    print(f"      匹配失败: {section_name}.{key},使用默认值 = {value} (m={m}, n={n})")

                param_item = {
                    "m": m,
                    "n": n,
                    "key": key,
                    "value": str(value).strip()
                }
                param_list.append(param_item)

            if param_list:
                param_mapping[step_id] = param_list

        print(f"生成任务参数值,包含 {len(param_mapping)} 个步骤")
        return param_mapping
</code_context>

<issue_to_address>
**issue (code-quality):** We've found these issues:

- Hoist repeated code outside conditional statement [×2] ([`hoist-statement-from-if`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/hoist-statement-from-if/))
- Low code quality found in BioyondReactionStation.generate\_task\_param\_values - 6% ([`low-code-quality`](https://docs.sourcery.ai/Reference/Default-Rules/comments/low-code-quality/))

<br/><details><summary>Explanation</summary>

The quality score for this function is below the quality threshold of 25%.
This score is a combination of the method length, cognitive complexity and working memory.

How can you solve this?

It might be worth refactoring this function to make it shorter and more readable.

- Reduce the function length by extracting pieces of functionality out into
  their own functions. This is the most important thing you can do - ideally a
  function should be less than 10 lines.
- Reduce nesting, perhaps by introducing guard clauses to return early.
- Ensure that variables are tightly scoped, so that code using related concepts
  sits together within the function rather than being scattered.</details>
</issue_to_address>

### Comment 10
<location> `unilabos/devices/workstation/bioyond_studio/station.py:331-333` </location>
<code_context>
    def append_to_workflow_sequence(self, web_workflow_name: str) -> bool:
        # 检查是否为JSON格式的字符串
        actual_workflow_name = web_workflow_name
        if web_workflow_name.startswith('{') and web_workflow_name.endswith('}'):
            try:
                data = json.loads(web_workflow_name)
                actual_workflow_name = data.get("web_workflow_name", web_workflow_name)
                print(f"解析JSON格式工作流名称: {web_workflow_name} -> {actual_workflow_name}")
            except json.JSONDecodeError:
                print(f"JSON解析失败,使用原始字符串: {web_workflow_name}")

        workflow_id = self._get_workflow(actual_workflow_name)
        if workflow_id:
            self.workflow_sequence.append(workflow_id)
            print(f"添加工作流到执行顺序: {actual_workflow_name} -> {workflow_id}")
            return True
        return False

</code_context>

<issue_to_address>
**suggestion (code-quality):** Use named expression to simplify assignment and conditional ([`use-named-expression`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/use-named-expression/))

```suggestion

        if workflow_id := self._get_workflow(actual_workflow_name):
```
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

Comment on lines -55 to 57
print_status(f"Warning: Node {node.get('id', 'unknown')} missing 'type', defaulting to 'device'", "warning")
if not node.get("name"):
if node.get("name", None) is None:
node["name"] = node.get("id")
print_status(f"Warning: Node {node.get('id', 'unknown')} missing 'name', defaulting to {node['name']}", "warning")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick: Changing name existence check to explicit None comparison may allow empty string names.

Empty strings for 'name' will now bypass the defaulting logic. Please confirm if this change is intentional.

Comment on lines +73 to +74
if sample_id:
logger.error(f"{node}的sample_id参数已弃用,sample_id: {sample_id}")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: Logging error for deprecated sample_id may be too severe for legacy data.

Consider lowering the log level to warning or info for legacy sample_id values unless this is a critical error.

Suggested change
if sample_id:
logger.error(f"{node}的sample_id参数已弃用,sample_id: {sample_id}")
if sample_id:
logger.warning(f"{node}的sample_id参数已弃用,sample_id: {sample_id}")

print_status(f"当前工作目录为 {working_dir}", "info")
load_config_from_file(config_path)

# 根据配置重新设置日志级别
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: Dynamically setting log level from config may cause unexpected verbosity changes.

Validate the log level from the config and set a reasonable default to avoid issues with logging verbosity.

- name: Setup Python environment
uses: actions/setup-python@v5
- name: Setup Miniforge (with mamba)
uses: conda-incubator/setup-miniconda@v3
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security (yaml.github-actions.security.third-party-action-not-pinned-to-commit-sha): An action sourced from a third-party repository on GitHub is not pinned to a full length commit SHA. Pinning an action to a full length commit SHA is currently the only way to use an action as an immutable release. Pinning to a particular SHA helps mitigate the risk of a bad actor adding a backdoor to the action's repository, as they would need to generate a SHA-1 collision for a valid Git object payload.

Source: opengrep

Comment on lines +960 to +962
final_resources = self.resource_tracker.figure_resource({"name": final_resources.name}, try_mode=False) if not is_sequence else [
self.resource_tracker.figure_resource({"name": res.name}, try_mode=False) for res in queried_resources
]
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (code-quality): Swap if/else branches of if expression to remove negation (swap-if-expression)

Suggested change
final_resources = self.resource_tracker.figure_resource({"name": final_resources.name}, try_mode=False) if not is_sequence else [
self.resource_tracker.figure_resource({"name": res.name}, try_mode=False) for res in queried_resources
]
final_resources = [
self.resource_tracker.figure_resource({"name": res.name}, try_mode=False) for res in queried_resources
] if is_sequence else self.resource_tracker.figure_resource({"name": final_resources.name}, try_mode=False)


ExplanationNegated conditions are more difficult to read than positive ones, so it is best
to avoid them where we can. By swapping the if and else conditions around we
can invert the condition and make it positive.

Comment on lines +331 to +334
workflow_names = []
for workflow_id in self.workflow_sequence:
workflow_names.append(id_to_name.get(workflow_id, workflow_id))
return workflow_names
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (code-quality): We've found these issues:

Suggested change
workflow_names = []
for workflow_id in self.workflow_sequence:
workflow_names.append(id_to_name.get(workflow_id, workflow_id))
return workflow_names
return [
id_to_name.get(workflow_id, workflow_id)
for workflow_id in self.workflow_sequence
]

@Xuwznln Xuwznln deleted the fix/resource-uuid-and-doc-fix branch November 14, 2025 18:10
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants