Conversation
Reviewer's GuideIntroduced concurrent workflow execution by adding a new process_workflow_parallel method and enhanced the demo’s interactive menu to let users run workflows either sequentially or in parallel. Sequence diagram for parallel workflow executionsequenceDiagram
actor User
participant Demo as demo_local_agents
participant Orchestrator
participant Agent1 as monitor_agent
participant Agent2 as file_agent
participant Agent3 as chat_agent
User->>Demo: Selects "Run workflow (parallel)"
Demo->>Orchestrator: process_workflow_parallel(workflow)
Orchestrator->>Agent1: monitor_performance() (async)
Orchestrator->>Agent2: file_operation(create, test.txt) (async)
Orchestrator->>Agent3: process_request(status) (async)
Orchestrator->>Agent2: file_operation(read, test.txt) (async)
Note over Orchestrator,Agent3: All agent tasks run concurrently
Agent1-->>Orchestrator: result
Agent2-->>Orchestrator: result
Agent3-->>Orchestrator: result
Orchestrator-->>Demo: results
Demo-->>User: Display results
Class diagram for Orchestrator with parallel workflow methodclassDiagram
class Orchestrator {
+process_workflow(workflow: list) dict
+process_workflow_parallel(workflow: list) dict
}
File-Level Changes
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
There was a problem hiding this comment.
Pull Request Overview
This PR adds parallel workflow execution capabilities to the local agent demo, allowing workflows to run concurrently rather than sequentially. This enables better performance for independent tasks that can be executed simultaneously.
Key changes:
- Added
process_workflow_parallelmethod for concurrent task execution - Updated demo menu to offer both sequential and parallel workflow options
- Enhanced documentation to reflect parallel workflow support
There was a problem hiding this comment.
Hey @Bryan-Roe - I've reviewed your changes and they look great!
Prompt for AI Agents
Please address the comments from this code review:
## Individual Comments
### Comment 1
<location> `demo_local_agents.py:148` </location>
<code_context>
+ logger.error(f"❌ Failed to schedule {agent_id}.{function_name} - {e}")
+ results[f"{agent_id}_{function_name}"] = f"Error: {e}"
+
+ completed = await asyncio.gather(*tasks, return_exceptions=True)
+ for name, res in zip(task_names, completed):
+ if isinstance(res, Exception):
</code_context>
<issue_to_address>
Potential issue if all tasks fail to schedule.
Consider adding a warning or log message if no tasks are scheduled, as this may indicate a configuration issue and the results will only reflect scheduling errors.
</issue_to_address>
<suggested_fix>
<<<<<<< SEARCH
completed = await asyncio.gather(*tasks, return_exceptions=True)
for name, res in zip(task_names, completed):
=======
if not tasks:
logger.warning("⚠️ No tasks were scheduled. This may indicate a configuration issue. Results will only reflect scheduling errors.")
completed = await asyncio.gather(*tasks, return_exceptions=True)
for name, res in zip(task_names, completed):
>>>>>>> REPLACE
</suggested_fix>
### Comment 2
<location> `demo_local_agents.py:128` </location>
<code_context>
return results
+ async def process_workflow_parallel(self, workflow: list) -> dict:
+ """Process workflow tasks concurrently across agents"""
+ results = {}
</code_context>
<issue_to_address>
Consider merging the sequential and parallel workflow methods into a single function with a parallel flag and reusing the workflow definition in the demo.
```suggestion
You can collapse the two nearly identical methods and demo branches by:
1. Merging `process_workflow` and `process_workflow_parallel` into one `process_workflow` with a `parallel: bool` flag.
2. Extracting the demo workflow definition once and re-using it for both sequential and parallel runs.
Example refactor:
```python
class AGIWorkflowOrchestrator:
async def process_workflow(self, workflow: list, parallel: bool = False) -> dict:
"""Process workflow tasks, either sequentially or in parallel."""
results = {}
tasks, names = [], []
for step in workflow:
agent = step["agent"]
fn = step["function"]
params = step.get("params", {})
try:
function = self.kernel.get_function(agent, fn)
if parallel:
tasks.append(asyncio.create_task(function.invoke(self.kernel, **params)))
names.append(f"{agent}_{fn}")
logger.info(f"🚀 Scheduled: {agent}.{fn}")
else:
res = await function.invoke(self.kernel, **params)
results[f"{agent}_{fn}"] = str(res)
logger.info(f"✅ Completed: {agent}.{fn}")
except Exception as e:
logger.error(f"❌ Failed {agent}.{fn} - {e}")
results[f"{agent}_{fn}"] = f"Error: {e}"
if parallel and tasks:
completed = await asyncio.gather(*tasks, return_exceptions=True)
for name, res in zip(names, completed):
if isinstance(res, Exception):
logger.error(f"❌ Failed: {name} - {res}")
results[name] = f"Error: {res}"
else:
logger.info(f"✅ Completed: {name}")
results[name] = str(res)
return results
```
Then in your demo:
```python
async def demo_local_agents():
demo_workflow = [
{"agent": "monitor_agent", "function": "monitor_performance"},
{"agent": "file_agent", "function": "file_operation", "params": {"operation":"create","filename":"test.txt"}},
{"agent": "chat_agent", "function": "process_request", "params": {"request":"status"}},
{"agent": "file_agent", "function": "file_operation", "params": {"operation":"read","filename":"test.txt"}},
]
# inside your menu:
if choice in ("5", "6"):
parallel = (choice == "6")
mode = "in parallel" if parallel else "sequentially"
print(f"🔄 Running workflow {mode}...")
results = await orchestrator.process_workflow(demo_workflow, parallel=parallel)
for step, res in results.items():
print(f" ✅ {step}: {res}")
```
This removes duplication in both the executor and the demo.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
|
|
||
| completed = await asyncio.gather(*tasks, return_exceptions=True) | ||
| for name, res in zip(task_names, completed): |
There was a problem hiding this comment.
suggestion: Potential issue if all tasks fail to schedule.
Consider adding a warning or log message if no tasks are scheduled, as this may indicate a configuration issue and the results will only reflect scheduling errors.
| completed = await asyncio.gather(*tasks, return_exceptions=True) | |
| for name, res in zip(task_names, completed): | |
| if not tasks: | |
| logger.warning("⚠️ No tasks were scheduled. This may indicate a configuration issue. Results will only reflect scheduling errors.") | |
| completed = await asyncio.gather(*tasks, return_exceptions=True) | |
| for name, res in zip(task_names, completed): |
|
|
||
| return results | ||
|
|
||
| async def process_workflow_parallel(self, workflow: list) -> dict: |
There was a problem hiding this comment.
issue (complexity): Consider merging the sequential and parallel workflow methods into a single function with a parallel flag and reusing the workflow definition in the demo.
| async def process_workflow_parallel(self, workflow: list) -> dict: | |
| You can collapse the two nearly identical methods and demo branches by: | |
| 1. Merging `process_workflow` and `process_workflow_parallel` into one `process_workflow` with a `parallel: bool` flag. | |
| 2. Extracting the demo workflow definition once and re-using it for both sequential and parallel runs. | |
| Example refactor: | |
| ```python | |
| class AGIWorkflowOrchestrator: | |
| async def process_workflow(self, workflow: list, parallel: bool = False) -> dict: | |
| """Process workflow tasks, either sequentially or in parallel.""" | |
| results = {} | |
| tasks, names = [], [] | |
| for step in workflow: | |
| agent = step["agent"] | |
| fn = step["function"] | |
| params = step.get("params", {}) | |
| try: | |
| function = self.kernel.get_function(agent, fn) | |
| if parallel: | |
| tasks.append(asyncio.create_task(function.invoke(self.kernel, **params))) | |
| names.append(f"{agent}_{fn}") | |
| logger.info(f"🚀 Scheduled: {agent}.{fn}") | |
| else: | |
| res = await function.invoke(self.kernel, **params) | |
| results[f"{agent}_{fn}"] = str(res) | |
| logger.info(f"✅ Completed: {agent}.{fn}") | |
| except Exception as e: | |
| logger.error(f"❌ Failed {agent}.{fn} - {e}") | |
| results[f"{agent}_{fn}"] = f"Error: {e}" | |
| if parallel and tasks: | |
| completed = await asyncio.gather(*tasks, return_exceptions=True) | |
| for name, res in zip(names, completed): | |
| if isinstance(res, Exception): | |
| logger.error(f"❌ Failed: {name} - {res}") | |
| results[name] = f"Error: {res}" | |
| else: | |
| logger.info(f"✅ Completed: {name}") | |
| results[name] = str(res) | |
| return results |
Then in your demo:
async def demo_local_agents():
demo_workflow = [
{"agent": "monitor_agent", "function": "monitor_performance"},
{"agent": "file_agent", "function": "file_operation", "params": {"operation":"create","filename":"test.txt"}},
{"agent": "chat_agent", "function": "process_request", "params": {"request":"status"}},
{"agent": "file_agent", "function": "file_operation", "params": {"operation":"read","filename":"test.txt"}},
]
# inside your menu:
if choice in ("5", "6"):
parallel = (choice == "6")
mode = "in parallel" if parallel else "sequentially"
print(f"🔄 Running workflow {mode}...")
results = await orchestrator.process_workflow(demo_workflow, parallel=parallel)
for step, res in results.items():
print(f" ✅ {step}: {res}")This removes duplication in both the executor and the demo.
|
|
||
| return results | ||
|
|
||
| async def demo_local_agents(): |
There was a problem hiding this comment.
issue (code-quality): Low code quality found in demo_local_agents - 24% (low-code-quality)
Explanation
The quality score for this function is below the quality threshold of 25%.This score is a combination of the method length, cognitive complexity and working memory.
How can you solve this?
It might be worth refactoring this function to make it shorter and more readable.
- Reduce the function length by extracting pieces of functionality out into
their own functions. This is the most important thing you can do - ideally a
function should be less than 10 lines. - Reduce nesting, perhaps by introducing guard clauses to return early.
- Ensure that variables are tightly scoped, so that code using related concepts
sits together within the function rather than being scattered.
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> Signed-off-by: Bryan <74067792+Bryan-Roe@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> Signed-off-by: Bryan <74067792+Bryan-Roe@users.noreply.github.com>
Summary
process_workflow_parallelfor concurrent agent tasksTesting
python -m pytest -q(fails: watchdog not installed)https://chatgpt.com/codex/tasks/task_e_6886a6fca05c8322b72753b6afc0aede
Summary by Sourcery
Enable parallel execution of workflow steps in the local agent demo by introducing a concurrent processing method and updating the interactive menu accordingly.
New Features:
Enhancements: