Skip to content

Agentic parallelization with different agents/tasks: tasks repeat in a loop, aggregator never runs, process never completes. #864

@mzazakeith

Description

@mzazakeith

Description

When using the workflow process mode in PraisonAIAgents with multiple agents performing different tasks in parallel, all initial tasks start and execute, but then the same tasks are repeatedly executed in a loop. The aggregator/summary task that depends on the completion of the parallel tasks never runs, and the process never completes. This happens even with simple, local tools and no LLM or network dependencies, and even when each agent/task is unique.
Using the example: Agentic Parallelization

Environment

  • Provider (select one):
    • Anthropic
    • OpenAI
    • Google Vertex AI
    • AWS Bedrock
    • Other:
  • PraisonAI version: PraisonAI==2.2.53
  • Operating System: macOS 15.4

Full Code

# agentic_parallelization_example.py

import asyncio
from praisonaiagents import Agent, Task, PraisonAIAgents

# Define a simple tool for demonstration
def square_number(n: int) -> int:
    """Returns the square of a number."""
    return n * n

# Create three agents, each will square a different number
agent1 = Agent(
    name="SquareAgent1",
    role="Squares numbers",
    goal="Square a given number",
    tools=[square_number],
    verbose=True,
    llm="gemini/gemini-2.5-flash-lite-preview-06-17",
)
agent2 = Agent(
    name="SquareAgent2",
    role="Squares numbers",
    goal="Square a given number",
    tools=[square_number],
    verbose=True,
    llm="gemini/gemini-2.5-flash-lite-preview-06-17",
)
agent3 = Agent(
    name="SquareAgent3",
    role="Squares numbers",
    goal="Square a given number",
    tools=[square_number],
    verbose=True,
    llm="gemini/gemini-2.5-flash-lite-preview-06-17",
)

# Aggregator agent to collect and summarize results
aggregator = Agent(
    name="Aggregator",
    role="Result aggregator",
    goal="Collect and summarize squared numbers",
    verbose=True,
    llm="gemini/gemini-2.5-flash-lite-preview-06-17",
)

# Define parallel tasks for each agent
task1 = Task(
    name="square_3",
    description="Square the number 3.",
    expected_output="The square of 3.",
    agent=agent1,
    is_start=True,
    async_execution=True
)
task2 = Task(
    name="square_7",
    description="Square the number 7.",
    expected_output="The square of 7.",
    agent=agent2,
    is_start=True,
    async_execution=True
)
task3 = Task(
    name="square_11",
    description="Square the number 11.",
    expected_output="The square of 11.",
    agent=agent3,
    is_start=True,
    async_execution=True
)

# Aggregator task that depends on the results of the above tasks
aggregate_task = Task(
    name="aggregate_results",
    description="Summarize the results of all square tasks.",
    expected_output="A summary of all squared numbers.",
    agent=aggregator,
    context=[task1, task2, task3]
)

async def main():
    # Set up the workflow manager
    workflow = PraisonAIAgents(
        agents=[agent1, agent2, agent3, aggregator],
        tasks=[task1, task2, task3, aggregate_task],
        process="workflow",
        verbose=True
    )

    # Run the workflow
    results = await workflow.astart()

    # Print results
    print("\nParallel Processing Results:")
    for task_id, result in results["task_results"].items():
        if result:
            print(f"Task {task_id}: {result.raw}")

if __name__ == "__main__":
    asyncio.run(main())

or

# agentic_parallelization_varied_tasks.py

import asyncio
from praisonaiagents import Agent, Task, PraisonAIAgents

# Example tools (replace with real implementations as needed)
def fetch_favorite_article():
    # Simulate fetching your favorite morning article
    return "Your favorite morning article: 'How to Start Your Day Right'"

def search_trending_kenya():
    # Simulate searching for trending news in Kenya
    return "Trending in Kenya: 'Kenya launches new tech hub in Nairobi'"

def fetch_twitter_feed():
    # Simulate fetching Twitter feed
    return "Latest tweet: 'AI is transforming the world!'"

# Agents for each unique task
article_agent = Agent(
    name="ArticleAgent",
    role="Morning Article Fetcher",
    goal="Fetch the user's favorite morning article",
    tools=[fetch_favorite_article],
    llm="gemini/gemini-2.5-flash-lite-preview-06-17",
    verbose=True
)

news_agent = Agent(
    name="KenyaNewsAgent",
    role="Kenya News Searcher",
    goal="Search for trending news in Kenya",
    tools=[search_trending_kenya],
    llm="gemini/gemini-2.5-flash-lite-preview-06-17",
    verbose=True
)

twitter_agent = Agent(
    name="TwitterAgent",
    role="Twitter Feed Fetcher",
    goal="Fetch the latest Twitter feed",
    tools=[fetch_twitter_feed],
    llm="gemini/gemini-2.5-flash-lite-preview-06-17",
    verbose=True
)

aggregator = Agent(
    name="Aggregator",
    role="Result Aggregator",
    goal="Aggregate and summarize all results",
    llm="gemini/gemini-2.5-flash-lite-preview-06-17",
    verbose=True
)

# Tasks for each agent
article_task = Task(
    name="fetch_article",
    description="Fetch the user's favorite morning article.",
    expected_output="The favorite morning article.",
    agent=article_agent,
    is_start=True,
    async_execution=True
)

news_task = Task(
    name="search_kenya_news",
    description="Search for trending news in Kenya.",
    expected_output="Trending news in Kenya.",
    agent=news_agent,
    is_start=True,
    async_execution=True
)

twitter_task = Task(
    name="fetch_twitter",
    description="Fetch the latest Twitter feed.",
    expected_output="Latest Twitter feed.",
    agent=twitter_agent,
    is_start=True,
    async_execution=True
)

# Aggregator task that depends on the above tasks
aggregate_task = Task(
    name="aggregate_results",
    description="Summarize the article, news, and Twitter feed results.",
    expected_output="A summary of all fetched information.",
    agent=aggregator,
    context=[article_task, news_task, twitter_task]
)

async def main():
    workflow = PraisonAIAgents(
        agents=[article_agent, news_agent, twitter_agent, aggregator],
        tasks=[article_task, news_task, twitter_task, aggregate_task],
        process="workflow",
        verbose=True
    )
    results = await workflow.astart()

    print("\nParallel Processing Results:")
    for task_id, result in results["task_results"].items():
        if result:
            print(f"Task {task_id}: {result.raw}")

if __name__ == "__main__":
    asyncio.run(main())

Steps to Reproduce

  1. Install the library
  2. Use the example scripts above
  3. Run the example scripts

Expected Behavior

  • Each parallel task (e.g., fetching an article, searching news, fetching Twitter feed) should execute once and complete.
  • After all parallel tasks have completed, the aggregator/summary task (which depends on their results) should execute.
  • The workflow should then finish, returning the results of all tasks (including the aggregator/summary).
  • The process should exit cleanly after all tasks are done—no repeated execution, no infinite loop, and no need to kill the process manually or get rate limited.

Actual Behavior

All parallel tasks (e.g., fetching an article, searching news, fetching Twitter feed) start and run.
The same tasks are then executed again and again in a loop.
The aggregator/summary task that depends on the parallel tasks never executes.
The process never completes and must be killed manually or stopped by provider rate limits.

Additional Context

This issue occurs in workflow (parallelization) mode, even with trivial, local tools and no LLM/network calls, and with different agents and tools for each task.
No errors are thrown; only repeated execution of the same tasks.
This is a completely separate issue from #862 which is for async agents and not Agentic parallelization

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't workingenhancementNew feature or requesthelp wantedExtra attention is needed

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions