Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
197 changes: 197 additions & 0 deletions lib/crewai/tests/test_crew.py
Original file line number Diff line number Diff line change
Expand Up @@ -4856,3 +4856,200 @@ def test_ensure_exchanged_messages_are_propagated_to_external_memory():
assert "Researcher" in messages[0]["content"]
assert messages[1]["role"] == "user"
assert "Research a topic to teach a kid aged 6 about math" in messages[1]["content"]


def test_crew_copy_inside_typed_flow():
"""Tests that crew.copy() works inside a Flow[MyState] without Pydantic validation errors.

Regression test for https://github.com/crewAIInc/crewAI/issues/4385
"""

class MyState(BaseModel):
question: str = ""

captured_copy = None

class MyFlow(Flow[MyState]):
@start()
def run_batch(self):
nonlocal captured_copy
agent = Agent(role="Worker", goal="Do work", backstory="A worker")
task = Task(
description="{task_input}",
agent=agent,
expected_output="output",
)
crew = Crew(
agents=[agent],
tasks=[task],
process=Process.sequential,
)
captured_copy = crew.copy()
return "done"

flow = MyFlow()
flow.kickoff()

assert captured_copy is not None
assert len(captured_copy.agents) == 1
assert len(captured_copy.tasks) == 1


def test_kickoff_for_each_inside_flow():
"""Tests that kickoff_for_each works when called inside a Flow method.

Regression test for https://github.com/crewAIInc/crewAI/issues/4385
"""

class MyState(BaseModel):
question: str = ""

captured_results = None

class MyFlow(Flow[MyState]):
@start()
def run_batch(self):
nonlocal captured_results
agent = Agent(role="Worker", goal="Do work", backstory="A worker")
task = Task(
description="{task_input}",
agent=agent,
expected_output="output",
)
crew = Crew(
agents=[agent],
tasks=[task],
process=Process.sequential,
)

inputs = [
{"task_input": "Task 1"},
{"task_input": "Task 2"},
]

mock_output = CrewOutput(
raw="result",
tasks_output=[],
token_usage=UsageMetrics(),
json_dict=None,
pydantic=None,
)
with patch.object(Crew, "kickoff", return_value=mock_output):
captured_results = crew.kickoff_for_each(inputs=inputs)
return "done"

flow = MyFlow()
flow.kickoff()

assert captured_results is not None
assert len(captured_results) == 2


@pytest.mark.asyncio
async def test_kickoff_for_each_async_inside_flow():
"""Tests that kickoff_for_each_async works when called inside a Flow method.

Regression test for https://github.com/crewAIInc/crewAI/issues/4385
"""

class MyState(BaseModel):
question: str = ""

captured_results = None

class MyFlow(Flow[MyState]):
@start()
async def run_batch(self):
nonlocal captured_results
agent = Agent(role="Worker", goal="Do work", backstory="A worker")
task = Task(
description="{task_input}",
agent=agent,
expected_output="output",
)
crew = Crew(
agents=[agent],
tasks=[task],
process=Process.sequential,
)

inputs = [
{"task_input": "Task 1"},
{"task_input": "Task 2"},
]

mock_output = CrewOutput(
raw="result",
tasks_output=[],
token_usage=UsageMetrics(),
json_dict=None,
pydantic=None,
)
with patch.object(
Crew, "kickoff_async", return_value=mock_output
):
captured_results = await crew.kickoff_for_each_async(
inputs=inputs
)
return "done"

flow = MyFlow()
await flow.kickoff_async()

assert captured_results is not None
assert len(captured_results) == 2


@pytest.mark.asyncio
async def test_akickoff_for_each_inside_flow():
"""Tests that akickoff_for_each works when called inside a Flow method.

Regression test for https://github.com/crewAIInc/crewAI/issues/4385
"""

class MyState(BaseModel):
question: str = ""

captured_results = None

class MyFlow(Flow[MyState]):
@start()
async def run_batch(self):
nonlocal captured_results
agent = Agent(role="Worker", goal="Do work", backstory="A worker")
task = Task(
description="{task_input}",
agent=agent,
expected_output="output",
)
crew = Crew(
agents=[agent],
tasks=[task],
process=Process.sequential,
)

inputs = [
{"task_input": "Task 1"},
{"task_input": "Task 2"},
]

mock_output = CrewOutput(
raw="result",
tasks_output=[],
token_usage=UsageMetrics(),
json_dict=None,
pydantic=None,
)

async def mock_akickoff(**kwargs):
return mock_output

with patch.object(Crew, "akickoff", side_effect=mock_akickoff):
captured_results = await crew.akickoff_for_each(inputs=inputs)
return "done"

flow = MyFlow()
await flow.kickoff_async()

assert captured_results is not None
assert len(captured_results) == 2
Loading