-
Notifications
You must be signed in to change notification settings - Fork 38
Fix/resource UUID and doc fix #108
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Refactored the Bioyond workstation classes to improve parameter handling and workflow management. Updated experiment.py to use BioyondReactionStation with deck and material mappings, and enhanced workflow step parameter mapping and execution logic. Adjusted JSON experiment configs, improved workflow sequence handling, and added UUID assignment to PLR materials. Removed unused station_config and material cache logic, and added detailed docstrings and debug output for workflow methods.
Fix resource parent not found. Mapping uuid for all resources.
Reviewer's GuideThis PR overhauls resource UUID management across node drivers and containers, refactors the Bioyond reaction station workflow pipeline to support deck injection and end-to-end task execution, and fixes documentation build jobs by migrating to a Conda/Mamba-based setup. Sequence diagram for resource UUID mapping and update in HostNode initializationsequenceDiagram
participant HostNode
participant ResourceDictInstance
participant ResourceTreeInstance
participant HTTPClient
participant ResourceTracker
HostNode->>ResourceDictInstance: get_resource_instance_from_dict(host_node_dict)
HostNode->>ResourceTreeInstance: create ResourceTreeInstance(host_node_instance)
HostNode->>HTTPClient: resource_tree_add(resources_config, "", True)
HTTPClient-->>HostNode: returns uuid_mapping
HostNode->>ResourceTracker: loop_update_uuid(resource_instance, uuid_mapping)
Sequence diagram for BioyondReactionStation workflow executionsequenceDiagram
participant User
participant BioyondReactionStation
participant HardwareInterface
User->>BioyondReactionStation: process_and_execute_workflow(workflow_name, task_name)
BioyondReactionStation->>BioyondReactionStation: get_workflow_sequence()
BioyondReactionStation->>BioyondReactionStation: process_web_workflows(web_workflow_json)
BioyondReactionStation->>BioyondReactionStation: merge_sequence_workflow(merge_json)
BioyondReactionStation->>HardwareInterface: workflow_step_query(workflow_query_json)
BioyondReactionStation->>BioyondReactionStation: generate_task_param_values(workflow_params_structure)
BioyondReactionStation->>HardwareInterface: create_order(task_json)
BioyondReactionStation-->>User: returns result
Class diagram for updated RegularContainer and BioyondReactionStationclassDiagram
class RegularContainer {
+Dict kwargs
+Dict state
+load_state(state: Dict)
}
RegularContainer --|> Container
class BioyondReactionStation {
+__init__(config: dict = None, deck=None)
+reactor_taken_out()
+reactor_taken_in(assign_material_name, cutoff, temperature)
+solid_feeding_vials(material_id, time, torque_variation, assign_material_name, temperature)
+liquid_feeding_vials_non_titration(volumeFormula, assign_material_name, titration_type, time, torque_variation, temperature)
+liquid_feeding_solvents(assign_material_name, volume, titration_type, time, torque_variation, temperature)
+liquid_feeding_titration(volume_formula, assign_material_name, titration_type, time, torque_variation, temperature)
+liquid_feeding_beaker(volume, assign_material_name, time, torque_variation, titrationType, temperature)
+get_workflow_sequence()
+workflow_step_query(workflow_id)
+create_order(json_str)
+process_and_execute_workflow(workflow_name, task_name)
+merge_sequence_workflow(json_str)
+generate_task_param_values(workflow_params_structure)
}
BioyondReactionStation --|> BioyondWorkstation
Class diagram for resource UUID management in ResourceDictInstance and ResourceTreeSetclassDiagram
class ResourceDictInstance {
+get_resource_instance_from_dict(content: Dict)
+get_nested_dict()
+children
+res_content
}
class ResourceTreeSet {
+trees
+set_resource_uuid(resource, new_uuid)
+loop_update_uuid(resource, uuid_map)
+add_resource(resource)
+to_plr_resources()
+from_raw_list(tree_data)
}
ResourceTreeSet "1" o-- "*" ResourceDictInstance
File-Level Changes
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey there - I've reviewed your changes - here's some feedback:
Blocking issues:
- An action sourced from a third-party repository on GitHub is not pinned to a full length commit SHA. Pinning an action to a full length commit SHA is currently the only way to use an action as an immutable release. Pinning to a particular SHA helps mitigate the risk of a bad actor adding a backdoor to the action's repository, as they would need to generate a SHA-1 collision for a valid Git object payload. (link)
General comments:
- Replace the many
print(...)debug statements with structuredloggercalls at appropriate levels and remove leftover commented‐out or unused code to keep the codebase clean. - The
process_and_execute_workflowandgenerate_task_param_valuesmethods have grown very large and complex—consider refactoring them into smaller, focused helper functions for readability and easier testing. - There’s duplicated and commented legacy logic around resource tree initialization (especially in HostNode and RegularContainer)—clean up these blocks to avoid confusion and consolidate to a single clear implementation.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- Replace the many `print(...)` debug statements with structured `logger` calls at appropriate levels and remove leftover commented‐out or unused code to keep the codebase clean.
- The `process_and_execute_workflow` and `generate_task_param_values` methods have grown very large and complex—consider refactoring them into smaller, focused helper functions for readability and easier testing.
- There’s duplicated and commented legacy logic around resource tree initialization (especially in HostNode and RegularContainer)—clean up these blocks to avoid confusion and consolidate to a single clear implementation.
## Individual Comments
### Comment 1
<location> `unilabos/resources/graphio.py:55-57` </location>
<code_context>
if not node.get("type"):
node["type"] = "device"
print_status(f"Warning: Node {node.get('id', 'unknown')} missing 'type', defaulting to 'device'", "warning")
- if not node.get("name"):
+ if node.get("name", None) is None:
node["name"] = node.get("id")
print_status(f"Warning: Node {node.get('id', 'unknown')} missing 'name', defaulting to {node['name']}", "warning")
</code_context>
<issue_to_address>
**nitpick:** Changing name existence check to explicit None comparison may allow empty string names.
Empty strings for 'name' will now bypass the defaulting logic. Please confirm if this change is intentional.
</issue_to_address>
### Comment 2
<location> `unilabos/resources/graphio.py:73-74` </location>
<code_context>
z = node.pop("z", None)
if z is not None:
node["position"]["position"]["z"] = z
+ if "sample_id" in node:
+ sample_id = node.pop("sample_id")
+ if sample_id:
</code_context>
<issue_to_address>
**suggestion:** Logging error for deprecated sample_id may be too severe for legacy data.
Consider lowering the log level to warning or info for legacy sample_id values unless this is a critical error.
```suggestion
if sample_id:
logger.warning(f"{node}的sample_id参数已弃用,sample_id: {sample_id}")
```
</issue_to_address>
### Comment 3
<location> `unilabos/app/main.py:216` </location>
<code_context>
print_status(f"当前工作目录为 {working_dir}", "info")
load_config_from_file(config_path)
+
+ # 根据配置重新设置日志级别
+ from unilabos.utils.log import configure_logger, logger
+
</code_context>
<issue_to_address>
**suggestion:** Dynamically setting log level from config may cause unexpected verbosity changes.
Validate the log level from the config and set a reasonable default to avoid issues with logging verbosity.
</issue_to_address>
### Comment 4
<location> `.github/workflows/deploy-docs.yml:45` </location>
<code_context>
uses: conda-incubator/setup-miniconda@v3
</code_context>
<issue_to_address>
**security (yaml.github-actions.security.third-party-action-not-pinned-to-commit-sha):** An action sourced from a third-party repository on GitHub is not pinned to a full length commit SHA. Pinning an action to a full length commit SHA is currently the only way to use an action as an immutable release. Pinning to a particular SHA helps mitigate the risk of a bad actor adding a backdoor to the action's repository, as they would need to generate a SHA-1 collision for a valid Git object payload.
*Source: opengrep*
</issue_to_address>
### Comment 5
<location> `unilabos/ros/nodes/base_device_node.py:960-962` </location>
<code_context>
final_resources = self.resource_tracker.figure_resource({"name": final_resources.name}, try_mode=False) if not is_sequence else [
self.resource_tracker.figure_resource({"name": res.name}, try_mode=False) for res in queried_resources
]
</code_context>
<issue_to_address>
**suggestion (code-quality):** Swap if/else branches of if expression to remove negation ([`swap-if-expression`](https://docs.sourcery.ai/Reference/Rules-and-In-Line-Suggestions/Python/Default-Rules/swap-if-expression))
```suggestion
final_resources = [
self.resource_tracker.figure_resource({"name": res.name}, try_mode=False) for res in queried_resources
] if is_sequence else self.resource_tracker.figure_resource({"name": final_resources.name}, try_mode=False)
```
<br/><details><summary>Explanation</summary>Negated conditions are more difficult to read than positive ones, so it is best
to avoid them where we can. By swapping the `if` and `else` conditions around we
can invert the condition and make it positive.
</details>
</issue_to_address>
### Comment 6
<location> `unilabos/devices/workstation/bioyond_studio/reaction_station.py:331-334` </location>
<code_context>
def get_workflow_sequence(self) -> List[str]:
"""获取当前工作流执行顺序
Returns:
工作流名称列表
"""
id_to_name = {workflow_id: name for name, workflow_id in self.workflow_mappings.items()}
workflow_names = []
for workflow_id in self.workflow_sequence:
workflow_names.append(id_to_name.get(workflow_id, workflow_id))
return workflow_names
</code_context>
<issue_to_address>
**suggestion (code-quality):** We've found these issues:
- Convert for loop into list comprehension ([`list-comprehension`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/list-comprehension/))
- Inline variable that is immediately returned ([`inline-immediately-returned-variable`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/inline-immediately-returned-variable/))
```suggestion
return [
id_to_name.get(workflow_id, workflow_id)
for workflow_id in self.workflow_sequence
]
```
</issue_to_address>
### Comment 7
<location> `unilabos/devices/workstation/bioyond_studio/reaction_station.py:374` </location>
<code_context>
def process_and_execute_workflow(self, workflow_name: str, task_name: str) -> dict:
web_workflow_list = self.get_workflow_sequence()
workflow_name = workflow_name
pending_params_backup = self.pending_task_params.copy()
print(f"保存pending_task_params副本,共{len(pending_params_backup)}个参数")
# 1. 处理网页工作流列表
print(f"处理网页工作流列表: {web_workflow_list}")
web_workflow_json = json.dumps({"web_workflow_list": web_workflow_list})
workflows_result = self.process_web_workflows(web_workflow_json)
if not workflows_result:
error_msg = "处理网页工作流列表失败"
print(error_msg)
result = str({"success": False, "error": f"process_and_execute_workflow:{error_msg}", "method": "process_and_execute_workflow", "step": "process_web_workflows"})
return result
# 2. 合并工作流序列
print(f"合并工作流序列,名称: {workflow_name}")
merge_json = json.dumps({"name": workflow_name})
merged_workflow = self.merge_sequence_workflow(merge_json)
print(f"合并工作流序列结果: {merged_workflow}")
if not merged_workflow:
error_msg = "合并工作流序列失败"
print(error_msg)
result = str({"success": False, "error": f"process_and_execute_workflow:{error_msg}", "method": "process_and_execute_workflow", "step": "merge_sequence_workflow"})
return result
# 3. 合并所有参数并创建任务
# 新API只返回状态信息,需要适配处理
if isinstance(merged_workflow, dict) and merged_workflow.get("code") == 1:
# 新API返回格式:{code: 1, message: "", timestamp: 0}
# 使用传入的工作流名称和生成的临时ID
final_workflow_name = workflow_name
workflow_id = f"merged_{workflow_name}_{self.hardware_interface.get_current_time_iso8601().replace('-', '').replace('T', '').replace(':', '').replace('.', '')[:14]}"
print(f"新API合并成功,使用工作流创建任务: {final_workflow_name} (临时ID: {workflow_id})")
else:
# 旧API返回格式:包含详细工作流信息
final_workflow_name = merged_workflow.get("name", workflow_name)
workflow_id = merged_workflow.get("subWorkflows", [{}])[0].get("id", "")
print(f"旧API格式,使用工作流创建任务: {final_workflow_name} (ID: {workflow_id})")
if not workflow_id:
error_msg = "无法获取工作流ID"
print(error_msg)
result = str({"success": False, "error": f"process_and_execute_workflow:{error_msg}", "method": "process_and_execute_workflow", "step": "get_workflow_id"})
return result
workflow_query_json = json.dumps({"workflow_id": workflow_id})
workflow_params_structure = self.workflow_step_query(workflow_query_json)
self.pending_task_params = pending_params_backup
print(f"恢复pending_task_params,共{len(self.pending_task_params)}个参数")
param_values = self.generate_task_param_values(workflow_params_structure)
task_params = [{
"orderCode": f"BSO{self.hardware_interface.get_current_time_iso8601().replace('-', '').replace('T', '').replace(':', '').replace('.', '')[:14]}",
"orderName": f"实验-{self.hardware_interface.get_current_time_iso8601()[:10].replace('-', '')}",
"workFlowId": workflow_id,
"borderNumber": 1,
"paramValues": param_values,
"extendProperties": ""
}]
task_json = json.dumps(task_params)
print(f"创建任务参数: {type(task_json)}")
result = self.create_order(task_json)
if not result:
error_msg = "创建任务失败"
print(error_msg)
result = str({"success": False, "error": f"process_and_execute_workflow:{error_msg}", "method": "process_and_execute_workflow", "step": "create_order"})
return result
print(f"任务创建成功: {result}")
self.pending_task_params.clear()
print("已清空pending_task_params")
return {
"success": True,
"workflow": {"name": final_workflow_name, "id": workflow_id},
"task": result,
"method": "process_and_execute_workflow"
}
</code_context>
<issue_to_address>
**issue (code-quality):** We've found these issues:
- Inline variable that is immediately returned [×3] ([`inline-immediately-returned-variable`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/inline-immediately-returned-variable/))
- Extract duplicate code into method ([`extract-duplicate-method`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/extract-duplicate-method/))
</issue_to_address>
### Comment 8
<location> `unilabos/devices/workstation/bioyond_studio/reaction_station.py:490-491` </location>
<code_context>
def merge_sequence_workflow(self, json_str: str) -> dict:
"""合并当前工作流序列
Args:
json_str: 包含name等参数的JSON字符串
Returns:
合并结果
"""
try:
data = json.loads(json_str)
name = data.get("name", "合并工作流")
step_parameters = data.get("stepParameters", {})
variables = data.get("variables", {})
except json.JSONDecodeError:
return {}
if not self.workflow_sequence:
print("工作流序列为空,无法合并")
return {}
# 将工作流ID列表转换为新API要求的格式
workflows = [{"id": workflow_id} for workflow_id in self.workflow_sequence]
# 构建新的API参数格式
params = {
"name": name,
"workflows": workflows,
"stepParameters": step_parameters,
"variables": variables
}
# 使用新的API接口
response = self.hardware_interface.post(
url=f'{self.hardware_interface.host}/api/lims/workflow/merge-workflow-with-parameters',
params={
"apiKey": self.hardware_interface.api_key,
"requestTime": self.hardware_interface.get_current_time_iso8601(),
"data": params,
})
if not response or response['code'] != 1:
return {}
return response.get("data", {})
</code_context>
<issue_to_address>
**issue (code-quality):** We've found these issues:
- Lift code into else after jump in control flow ([`reintroduce-else`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/reintroduce-else/))
- Swap if/else branches ([`swap-if-else-branches`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/swap-if-else-branches/))
- Replace if statement with if expression ([`assign-if-exp`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/assign-if-exp/))
</issue_to_address>
### Comment 9
<location> `unilabos/devices/workstation/bioyond_studio/reaction_station.py:494` </location>
<code_context>
def generate_task_param_values(self, workflow_params_structure: dict) -> dict:
"""生成任务参数值
根据工作流参数结构和待处理的任务参数,生成最终的任务参数值
Args:
workflow_params_structure: 工作流参数结构
Returns:
任务参数值字典
"""
if not workflow_params_structure:
print("workflow_params_structure为空")
return {}
data = workflow_params_structure
# 从pending_task_params中提取实际参数值,按DisplaySectionName和Key组织
pending_params_by_section = {}
print(f"开始处理pending_task_params,共{len(self.pending_task_params)}个任务参数组")
# 获取工作流执行顺序,用于按顺序匹配参数
workflow_sequence = self.get_workflow_sequence()
print(f"工作流执行顺序: {workflow_sequence}")
workflow_index = 0
# 遍历所有待处理的任务参数
for i, task_param in enumerate(self.pending_task_params):
if 'param_values' in task_param:
print(f"处理第{i+1}个任务参数组,包含{len(task_param['param_values'])}个步骤")
if workflow_index < len(workflow_sequence):
current_workflow = workflow_sequence[workflow_index]
section_name = WORKFLOW_TO_SECTION_MAP.get(current_workflow)
print(f" 匹配到工作流: {current_workflow} -> {section_name}")
workflow_index += 1
else:
print(f" 警告: 参数组{i+1}超出了工作流序列范围")
continue
if not section_name:
print(f" 警告: 工作流{current_workflow}没有对应的DisplaySectionName")
continue
if section_name not in pending_params_by_section:
pending_params_by_section[section_name] = {}
# 处理每个步骤的参数
for step_id, param_list in task_param['param_values'].items():
print(f" 步骤ID: {step_id},参数数量: {len(param_list)}")
for param_item in param_list:
key = param_item.get('Key', '')
value = param_item.get('Value', '')
m = param_item.get('m', 0)
n = param_item.get('n', 0)
print(f" 参数: {key} = {value} (m={m}, n={n}) -> 分组到{section_name}")
param_key = f"{section_name}.{key}"
if param_key not in pending_params_by_section[section_name]:
pending_params_by_section[section_name][param_key] = []
pending_params_by_section[section_name][param_key].append({
'value': value,
'm': m,
'n': n
})
print(f"pending_params_by_section构建完成,包含{len(pending_params_by_section)}个分组")
# 收集所有参数,过滤TaskDisplayable为0的项
filtered_params = []
for step_id, step_info in data.items():
if isinstance(step_info, list):
for step_item in step_info:
param_list = step_item.get("parameterList", [])
for param in param_list:
if param.get("TaskDisplayable") == 0:
continue
param_with_step = param.copy()
param_with_step['step_id'] = step_id
param_with_step['step_name'] = step_item.get("name", "")
param_with_step['step_m'] = step_item.get("m", 0)
param_with_step['step_n'] = step_item.get("n", 0)
filtered_params.append(param_with_step)
# 按DisplaySectionIndex排序
filtered_params.sort(key=lambda x: x.get('DisplaySectionIndex', 0))
# 生成参数映射
param_mapping = {}
step_params = {}
for param in filtered_params:
step_id = param['step_id']
if step_id not in step_params:
step_params[step_id] = []
step_params[step_id].append(param)
# 为每个步骤生成参数
for step_id, params in step_params.items():
param_list = []
for param in params:
key = param.get('Key', '')
display_section_index = param.get('DisplaySectionIndex', 0)
step_m = param.get('step_m', 0)
step_n = param.get('step_n', 0)
section_name = param.get('DisplaySectionName', '')
param_key = f"{section_name}.{key}"
if section_name in pending_params_by_section and param_key in pending_params_by_section[section_name]:
pending_param_list = pending_params_by_section[section_name][param_key]
if pending_param_list:
pending_param = pending_param_list[0]
value = pending_param['value']
m = step_m
n = step_n
print(f" 匹配成功: {section_name}.{key} = {value} (m={m}, n={n})")
pending_param_list.pop(0)
else:
value = "1"
m = step_m
n = step_n
print(f" 匹配失败: {section_name}.{key},参数列表为空,使用默认值 = {value}")
else:
value = "1"
m = display_section_index
n = step_n
print(f" 匹配失败: {section_name}.{key},使用默认值 = {value} (m={m}, n={n})")
param_item = {
"m": m,
"n": n,
"key": key,
"value": str(value).strip()
}
param_list.append(param_item)
if param_list:
param_mapping[step_id] = param_list
print(f"生成任务参数值,包含 {len(param_mapping)} 个步骤")
return param_mapping
</code_context>
<issue_to_address>
**issue (code-quality):** We've found these issues:
- Hoist repeated code outside conditional statement [×2] ([`hoist-statement-from-if`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/hoist-statement-from-if/))
- Low code quality found in BioyondReactionStation.generate\_task\_param\_values - 6% ([`low-code-quality`](https://docs.sourcery.ai/Reference/Default-Rules/comments/low-code-quality/))
<br/><details><summary>Explanation</summary>
The quality score for this function is below the quality threshold of 25%.
This score is a combination of the method length, cognitive complexity and working memory.
How can you solve this?
It might be worth refactoring this function to make it shorter and more readable.
- Reduce the function length by extracting pieces of functionality out into
their own functions. This is the most important thing you can do - ideally a
function should be less than 10 lines.
- Reduce nesting, perhaps by introducing guard clauses to return early.
- Ensure that variables are tightly scoped, so that code using related concepts
sits together within the function rather than being scattered.</details>
</issue_to_address>
### Comment 10
<location> `unilabos/devices/workstation/bioyond_studio/station.py:331-333` </location>
<code_context>
def append_to_workflow_sequence(self, web_workflow_name: str) -> bool:
# 检查是否为JSON格式的字符串
actual_workflow_name = web_workflow_name
if web_workflow_name.startswith('{') and web_workflow_name.endswith('}'):
try:
data = json.loads(web_workflow_name)
actual_workflow_name = data.get("web_workflow_name", web_workflow_name)
print(f"解析JSON格式工作流名称: {web_workflow_name} -> {actual_workflow_name}")
except json.JSONDecodeError:
print(f"JSON解析失败,使用原始字符串: {web_workflow_name}")
workflow_id = self._get_workflow(actual_workflow_name)
if workflow_id:
self.workflow_sequence.append(workflow_id)
print(f"添加工作流到执行顺序: {actual_workflow_name} -> {workflow_id}")
return True
return False
</code_context>
<issue_to_address>
**suggestion (code-quality):** Use named expression to simplify assignment and conditional ([`use-named-expression`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/use-named-expression/))
```suggestion
if workflow_id := self._get_workflow(actual_workflow_name):
```
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
| print_status(f"Warning: Node {node.get('id', 'unknown')} missing 'type', defaulting to 'device'", "warning") | ||
| if not node.get("name"): | ||
| if node.get("name", None) is None: | ||
| node["name"] = node.get("id") | ||
| print_status(f"Warning: Node {node.get('id', 'unknown')} missing 'name', defaulting to {node['name']}", "warning") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nitpick: Changing name existence check to explicit None comparison may allow empty string names.
Empty strings for 'name' will now bypass the defaulting logic. Please confirm if this change is intentional.
| if sample_id: | ||
| logger.error(f"{node}的sample_id参数已弃用,sample_id: {sample_id}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion: Logging error for deprecated sample_id may be too severe for legacy data.
Consider lowering the log level to warning or info for legacy sample_id values unless this is a critical error.
| if sample_id: | |
| logger.error(f"{node}的sample_id参数已弃用,sample_id: {sample_id}") | |
| if sample_id: | |
| logger.warning(f"{node}的sample_id参数已弃用,sample_id: {sample_id}") |
| print_status(f"当前工作目录为 {working_dir}", "info") | ||
| load_config_from_file(config_path) | ||
|
|
||
| # 根据配置重新设置日志级别 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion: Dynamically setting log level from config may cause unexpected verbosity changes.
Validate the log level from the config and set a reasonable default to avoid issues with logging verbosity.
| - name: Setup Python environment | ||
| uses: actions/setup-python@v5 | ||
| - name: Setup Miniforge (with mamba) | ||
| uses: conda-incubator/setup-miniconda@v3 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
security (yaml.github-actions.security.third-party-action-not-pinned-to-commit-sha): An action sourced from a third-party repository on GitHub is not pinned to a full length commit SHA. Pinning an action to a full length commit SHA is currently the only way to use an action as an immutable release. Pinning to a particular SHA helps mitigate the risk of a bad actor adding a backdoor to the action's repository, as they would need to generate a SHA-1 collision for a valid Git object payload.
Source: opengrep
| final_resources = self.resource_tracker.figure_resource({"name": final_resources.name}, try_mode=False) if not is_sequence else [ | ||
| self.resource_tracker.figure_resource({"name": res.name}, try_mode=False) for res in queried_resources | ||
| ] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion (code-quality): Swap if/else branches of if expression to remove negation (swap-if-expression)
| final_resources = self.resource_tracker.figure_resource({"name": final_resources.name}, try_mode=False) if not is_sequence else [ | |
| self.resource_tracker.figure_resource({"name": res.name}, try_mode=False) for res in queried_resources | |
| ] | |
| final_resources = [ | |
| self.resource_tracker.figure_resource({"name": res.name}, try_mode=False) for res in queried_resources | |
| ] if is_sequence else self.resource_tracker.figure_resource({"name": final_resources.name}, try_mode=False) | |
Explanation
Negated conditions are more difficult to read than positive ones, so it is bestto avoid them where we can. By swapping the
if and else conditions around wecan invert the condition and make it positive.
| workflow_names = [] | ||
| for workflow_id in self.workflow_sequence: | ||
| workflow_names.append(id_to_name.get(workflow_id, workflow_id)) | ||
| return workflow_names |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion (code-quality): We've found these issues:
- Convert for loop into list comprehension (
list-comprehension) - Inline variable that is immediately returned (
inline-immediately-returned-variable)
| workflow_names = [] | |
| for workflow_id in self.workflow_sequence: | |
| workflow_names.append(id_to_name.get(workflow_id, workflow_id)) | |
| return workflow_names | |
| return [ | |
| id_to_name.get(workflow_id, workflow_id) | |
| for workflow_id in self.workflow_sequence | |
| ] |
Adapt uuid resource manager to RegularContainer, WorkstationNode.
Fix doc build.
Summary by Sourcery
Adapt resource UUID management across core components and workstation nodes, enhance Bioyond workstation drivers with deck support and a full-featured workflow orchestration API, and fix documentation CI and examples to build reliably under a conda/Mamba environment.
New Features:
Bug Fixes:
Enhancements:
CI:
Documentation: