Skip to content

Add parallel workflow execution#761

Merged
Bryan-Roe merged 3 commits intomainfrom
codex/add-support-for-multiple-agents
Aug 3, 2025
Merged

Add parallel workflow execution#761
Bryan-Roe merged 3 commits intomainfrom
codex/add-support-for-multiple-agents

Conversation

@Bryan-Roe
Copy link
Collaborator

@Bryan-Roe Bryan-Roe commented Jul 27, 2025

Summary

  • update local agent demo to run workflows sequentially or in parallel
  • add process_workflow_parallel for concurrent agent tasks

Testing

  • python -m pytest -q (fails: watchdog not installed)

https://chatgpt.com/codex/tasks/task_e_6886a6fca05c8322b72753b6afc0aede

Summary by Sourcery

Enable parallel execution of workflow steps in the local agent demo by introducing a concurrent processing method and updating the interactive menu accordingly.

New Features:

  • Add process_workflow_parallel method to run workflow tasks concurrently

Enhancements:

  • Update demo menu to include options for sequential and parallel workflow execution
  • Enhance top-level docstring to note parallel workflow support

Copilot AI review requested due to automatic review settings July 27, 2025 22:31
@sourcery-ai
Copy link

sourcery-ai bot commented Jul 27, 2025

Reviewer's Guide

Introduced concurrent workflow execution by adding a new process_workflow_parallel method and enhanced the demo’s interactive menu to let users run workflows either sequentially or in parallel.

Sequence diagram for parallel workflow execution

sequenceDiagram
    actor User
    participant Demo as demo_local_agents
    participant Orchestrator
    participant Agent1 as monitor_agent
    participant Agent2 as file_agent
    participant Agent3 as chat_agent

    User->>Demo: Selects "Run workflow (parallel)"
    Demo->>Orchestrator: process_workflow_parallel(workflow)
    Orchestrator->>Agent1: monitor_performance() (async)
    Orchestrator->>Agent2: file_operation(create, test.txt) (async)
    Orchestrator->>Agent3: process_request(status) (async)
    Orchestrator->>Agent2: file_operation(read, test.txt) (async)
    Note over Orchestrator,Agent3: All agent tasks run concurrently
    Agent1-->>Orchestrator: result
    Agent2-->>Orchestrator: result
    Agent3-->>Orchestrator: result
    Orchestrator-->>Demo: results
    Demo-->>User: Display results
Loading

Class diagram for Orchestrator with parallel workflow method

classDiagram
    class Orchestrator {
        +process_workflow(workflow: list) dict
        +process_workflow_parallel(workflow: list) dict
    }
Loading

File-Level Changes

Change Details Files
Parallel workflow execution feature
  • Added async method process_workflow_parallel to schedule and run tasks concurrently
  • Used asyncio.create_task and asyncio.gather with return_exceptions to capture results and errors
  • Logged task scheduling, completion, and failures for better observability
demo_local_agents.py
Interactive demo menu augmentation
  • Renumbered menu options to include separate entries for sequential and parallel workflows
  • Added choice handling for running process_workflow_parallel and displaying its results
  • Updated exit option to reflect new menu structure
demo_local_agents.py

Tips and commands

Interacting with Sourcery

  • Trigger a new review: Comment @sourcery-ai review on the pull request.
  • Continue discussions: Reply directly to Sourcery's review comments.
  • Generate a GitHub issue from a review comment: Ask Sourcery to create an
    issue from a review comment by replying to it. You can also reply to a
    review comment with @sourcery-ai issue to create an issue from it.
  • Generate a pull request title: Write @sourcery-ai anywhere in the pull
    request title to generate a title at any time. You can also comment
    @sourcery-ai title on the pull request to (re-)generate the title at any time.
  • Generate a pull request summary: Write @sourcery-ai summary anywhere in
    the pull request body to generate a PR summary at any time exactly where you
    want it. You can also comment @sourcery-ai summary on the pull request to
    (re-)generate the summary at any time.
  • Generate reviewer's guide: Comment @sourcery-ai guide on the pull
    request to (re-)generate the reviewer's guide at any time.
  • Resolve all Sourcery comments: Comment @sourcery-ai resolve on the
    pull request to resolve all Sourcery comments. Useful if you've already
    addressed all the comments and don't want to see them anymore.
  • Dismiss all Sourcery reviews: Comment @sourcery-ai dismiss on the pull
    request to dismiss all existing Sourcery reviews. Especially useful if you
    want to start fresh with a new review - don't forget to comment
    @sourcery-ai review to trigger a new review!

Customizing Your Experience

Access your dashboard to:

  • Enable or disable review features such as the Sourcery-generated pull request
    summary, the reviewer's guide, and others.
  • Change the review language.
  • Add, remove or edit custom review instructions.
  • Adjust other review settings.

Getting Help

Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR adds parallel workflow execution capabilities to the local agent demo, allowing workflows to run concurrently rather than sequentially. This enables better performance for independent tasks that can be executed simultaneously.

Key changes:

  • Added process_workflow_parallel method for concurrent task execution
  • Updated demo menu to offer both sequential and parallel workflow options
  • Enhanced documentation to reflect parallel workflow support

Copy link

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @Bryan-Roe - I've reviewed your changes and they look great!

Prompt for AI Agents
Please address the comments from this code review:
## Individual Comments

### Comment 1
<location> `demo_local_agents.py:148` </location>
<code_context>
+                logger.error(f"❌ Failed to schedule {agent_id}.{function_name} - {e}")
+                results[f"{agent_id}_{function_name}"] = f"Error: {e}"
+
+        completed = await asyncio.gather(*tasks, return_exceptions=True)
+        for name, res in zip(task_names, completed):
+            if isinstance(res, Exception):
</code_context>

<issue_to_address>
Potential issue if all tasks fail to schedule.

Consider adding a warning or log message if no tasks are scheduled, as this may indicate a configuration issue and the results will only reflect scheduling errors.
</issue_to_address>

<suggested_fix>
<<<<<<< SEARCH

        completed = await asyncio.gather(*tasks, return_exceptions=True)
        for name, res in zip(task_names, completed):
=======

        if not tasks:
            logger.warning("⚠️ No tasks were scheduled. This may indicate a configuration issue. Results will only reflect scheduling errors.")

        completed = await asyncio.gather(*tasks, return_exceptions=True)
        for name, res in zip(task_names, completed):
>>>>>>> REPLACE

</suggested_fix>

### Comment 2
<location> `demo_local_agents.py:128` </location>
<code_context>

         return results

+    async def process_workflow_parallel(self, workflow: list) -> dict:
+        """Process workflow tasks concurrently across agents"""
+        results = {}
</code_context>

<issue_to_address>
Consider merging the sequential and parallel workflow methods into a single function with a parallel flag and reusing the workflow definition in the demo.

```suggestion
You can collapse the two nearly identical methods and demo branches by:

1. Merging `process_workflow` and `process_workflow_parallel` into one `process_workflow` with a `parallel: bool` flag.
2. Extracting the demo workflow definition once and re-using it for both sequential and parallel runs.

Example refactor:

```python
class AGIWorkflowOrchestrator:
    async def process_workflow(self, workflow: list, parallel: bool = False) -> dict:
        """Process workflow tasks, either sequentially or in parallel."""
        results = {}
        tasks, names = [], []

        for step in workflow:
            agent = step["agent"]
            fn    = step["function"]
            params = step.get("params", {})
            try:
                function = self.kernel.get_function(agent, fn)
                if parallel:
                    tasks.append(asyncio.create_task(function.invoke(self.kernel, **params)))
                    names.append(f"{agent}_{fn}")
                    logger.info(f"🚀 Scheduled: {agent}.{fn}")
                else:
                    res = await function.invoke(self.kernel, **params)
                    results[f"{agent}_{fn}"] = str(res)
                    logger.info(f"✅ Completed: {agent}.{fn}")
            except Exception as e:
                logger.error(f"❌ Failed {agent}.{fn} - {e}")
                results[f"{agent}_{fn}"] = f"Error: {e}"

        if parallel and tasks:
            completed = await asyncio.gather(*tasks, return_exceptions=True)
            for name, res in zip(names, completed):
                if isinstance(res, Exception):
                    logger.error(f"❌ Failed: {name} - {res}")
                    results[name] = f"Error: {res}"
                else:
                    logger.info(f"✅ Completed: {name}")
                    results[name] = str(res)

        return results
```

Then in your demo:

```python
async def demo_local_agents():
    demo_workflow = [
        {"agent": "monitor_agent", "function": "monitor_performance"},
        {"agent": "file_agent",    "function": "file_operation", "params": {"operation":"create","filename":"test.txt"}},
        {"agent": "chat_agent",    "function": "process_request", "params": {"request":"status"}},
        {"agent": "file_agent",    "function": "file_operation", "params": {"operation":"read","filename":"test.txt"}},
    ]

    # inside your menu:
    if choice in ("5", "6"):
        parallel = (choice == "6")
        mode = "in parallel" if parallel else "sequentially"
        print(f"🔄 Running workflow {mode}...")
        results = await orchestrator.process_workflow(demo_workflow, parallel=parallel)
        for step, res in results.items():
            print(f"{step}: {res}")
```

This removes duplication in both the executor and the demo.
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

Comment on lines +147 to +149

completed = await asyncio.gather(*tasks, return_exceptions=True)
for name, res in zip(task_names, completed):
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: Potential issue if all tasks fail to schedule.

Consider adding a warning or log message if no tasks are scheduled, as this may indicate a configuration issue and the results will only reflect scheduling errors.

Suggested change
completed = await asyncio.gather(*tasks, return_exceptions=True)
for name, res in zip(task_names, completed):
if not tasks:
logger.warning("⚠️ No tasks were scheduled. This may indicate a configuration issue. Results will only reflect scheduling errors.")
completed = await asyncio.gather(*tasks, return_exceptions=True)
for name, res in zip(task_names, completed):


return results

async def process_workflow_parallel(self, workflow: list) -> dict:
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (complexity): Consider merging the sequential and parallel workflow methods into a single function with a parallel flag and reusing the workflow definition in the demo.

Suggested change
async def process_workflow_parallel(self, workflow: list) -> dict:
You can collapse the two nearly identical methods and demo branches by:
1. Merging `process_workflow` and `process_workflow_parallel` into one `process_workflow` with a `parallel: bool` flag.
2. Extracting the demo workflow definition once and re-using it for both sequential and parallel runs.
Example refactor:
```python
class AGIWorkflowOrchestrator:
async def process_workflow(self, workflow: list, parallel: bool = False) -> dict:
"""Process workflow tasks, either sequentially or in parallel."""
results = {}
tasks, names = [], []
for step in workflow:
agent = step["agent"]
fn = step["function"]
params = step.get("params", {})
try:
function = self.kernel.get_function(agent, fn)
if parallel:
tasks.append(asyncio.create_task(function.invoke(self.kernel, **params)))
names.append(f"{agent}_{fn}")
logger.info(f"🚀 Scheduled: {agent}.{fn}")
else:
res = await function.invoke(self.kernel, **params)
results[f"{agent}_{fn}"] = str(res)
logger.info(f"✅ Completed: {agent}.{fn}")
except Exception as e:
logger.error(f"❌ Failed {agent}.{fn} - {e}")
results[f"{agent}_{fn}"] = f"Error: {e}"
if parallel and tasks:
completed = await asyncio.gather(*tasks, return_exceptions=True)
for name, res in zip(names, completed):
if isinstance(res, Exception):
logger.error(f"❌ Failed: {name} - {res}")
results[name] = f"Error: {res}"
else:
logger.info(f"✅ Completed: {name}")
results[name] = str(res)
return results

Then in your demo:

async def demo_local_agents():
    demo_workflow = [
        {"agent": "monitor_agent", "function": "monitor_performance"},
        {"agent": "file_agent",    "function": "file_operation", "params": {"operation":"create","filename":"test.txt"}},
        {"agent": "chat_agent",    "function": "process_request", "params": {"request":"status"}},
        {"agent": "file_agent",    "function": "file_operation", "params": {"operation":"read","filename":"test.txt"}},
    ]

    # inside your menu:
    if choice in ("5", "6"):
        parallel = (choice == "6")
        mode = "in parallel" if parallel else "sequentially"
        print(f"🔄 Running workflow {mode}...")
        results = await orchestrator.process_workflow(demo_workflow, parallel=parallel)
        for step, res in results.items():
            print(f"  ✅ {step}: {res}")

This removes duplication in both the executor and the demo.


return results

async def demo_local_agents():
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (code-quality): Low code quality found in demo_local_agents - 24% (low-code-quality)


ExplanationThe quality score for this function is below the quality threshold of 25%.
This score is a combination of the method length, cognitive complexity and working memory.

How can you solve this?

It might be worth refactoring this function to make it shorter and more readable.

  • Reduce the function length by extracting pieces of functionality out into
    their own functions. This is the most important thing you can do - ideally a
    function should be less than 10 lines.
  • Reduce nesting, perhaps by introducing guard clauses to return early.
  • Ensure that variables are tightly scoped, so that code using related concepts
    sits together within the function rather than being scattered.

Bryan-Roe and others added 2 commits August 3, 2025 01:32
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Signed-off-by: Bryan <74067792+Bryan-Roe@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Signed-off-by: Bryan <74067792+Bryan-Roe@users.noreply.github.com>
@Bryan-Roe Bryan-Roe merged commit e223f90 into main Aug 3, 2025
7 of 11 checks passed
@Bryan-Roe Bryan-Roe deleted the codex/add-support-for-multiple-agents branch August 3, 2025 08:33
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant