Skip to content

Commit 7e7f532

Browse files
authored
fix(crewai): CrewAI Flow Traces Not Generated With Async (#2308)
1 parent c80c81b commit 7e7f532

File tree

3 files changed

+19
-16
lines changed

3 files changed

+19
-16
lines changed

python/instrumentation/openinference-instrumentation-crewai/examples/advanced_flow.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
and display structured and human-readable results.
1212
"""
1313

14+
import asyncio
1415
import os
1516
from typing import Any, Dict, List, Optional
1617

@@ -186,7 +187,7 @@ def create_advanced_flow(flow_name: Optional[str] = None) -> Flow:
186187
return flow
187188

188189

189-
def run_advanced_flow():
190+
async def run_advanced_flow():
190191
"""
191192
Executes the advanced flow and handles any runtime exceptions.
192193
"""
@@ -197,7 +198,7 @@ def run_advanced_flow():
197198
# Inputs are passed here and injected into the @start() method
198199
inputs = {"product": "AI-powered Chatbots"}
199200

200-
flow.kickoff(inputs=inputs)
201+
await flow.kickoff_async(inputs=inputs)
201202
print("✅ Flow execution completed successfully.")
202203
except Exception as e:
203204
print(f"⚠️ Flow execution failed: {type(e).__name__}")
@@ -207,7 +208,7 @@ def main():
207208
"""Run the CrewAI instrumentation demonstration."""
208209
print("CrewAI Instrumentation Demo - Advanced Flow")
209210
print("Check Phoenix UI at http://localhost:6006 for trace visualization\n")
210-
run_advanced_flow()
211+
asyncio.run(run_advanced_flow())
211212

212213

213214
if __name__ == "__main__":

python/instrumentation/openinference-instrumentation-crewai/src/openinference/instrumentation/crewai/__init__.py

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
from openinference.instrumentation.crewai._wrappers import (
1414
_CrewKickoffWrapper,
1515
_ExecuteCoreWrapper,
16-
_FlowKickoffWrapper,
16+
_FlowKickoffAsyncWrapper,
1717
_ToolUseWrapper,
1818
)
1919
from openinference.instrumentation.crewai.version import __version__
@@ -27,7 +27,7 @@ class CrewAIInstrumentor(BaseInstrumentor): # type: ignore
2727
__slots__ = (
2828
"_original_execute_core",
2929
"_original_crew_kickoff",
30-
"_original_flow_kickoff",
30+
"_original_flow_kickoff_async",
3131
"_original_tool_use",
3232
"_tracer",
3333
)
@@ -63,12 +63,14 @@ def _instrument(self, **kwargs: Any) -> None:
6363
wrapper=crew_kickoff_wrapper,
6464
)
6565

66-
flow_kickoff_wrapper = _FlowKickoffWrapper(tracer=self._tracer)
67-
self._original_flow_kickoff = getattr(import_module("crewai").Flow, "kickoff", None)
66+
flow_kickoff_async_wrapper = _FlowKickoffAsyncWrapper(tracer=self._tracer)
67+
self._original_flow_kickoff_async = getattr(
68+
import_module("crewai").Flow, "kickoff_async", None
69+
)
6870
wrap_function_wrapper(
6971
module="crewai",
70-
name="Flow.kickoff",
71-
wrapper=flow_kickoff_wrapper,
72+
name="Flow.kickoff_async",
73+
wrapper=flow_kickoff_async_wrapper,
7274
)
7375

7476
use_wrapper = _ToolUseWrapper(tracer=self._tracer)
@@ -92,10 +94,10 @@ def _uninstrument(self, **kwargs: Any) -> None:
9294
crew_module.Crew.kickoff = self._original_crew_kickoff
9395
self._original_crew_kickoff = None
9496

95-
if self._original_flow_kickoff is not None:
97+
if self._original_flow_kickoff_async is not None:
9698
crew_module = import_module("crewai")
97-
crew_module.Flow.kickoff = self._original_flow_kickoff
98-
self._original_flow_kickoff = None
99+
crew_module.Flow.kickoff_async = self._original_flow_kickoff_async
100+
self._original_flow_kickoff_async = None
99101

100102
if self._original_tool_use is not None:
101103
tool_usage_module = import_module("crewai.tools.tool_usage")

python/instrumentation/openinference-instrumentation-crewai/src/openinference/instrumentation/crewai/_wrappers.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -327,19 +327,19 @@ def __call__(
327327
return crew_output
328328

329329

330-
class _FlowKickoffWrapper:
330+
class _FlowKickoffAsyncWrapper:
331331
def __init__(self, tracer: trace_api.Tracer) -> None:
332332
self._tracer = tracer
333333

334-
def __call__(
334+
async def __call__(
335335
self,
336336
wrapped: Callable[..., Any],
337337
instance: Any,
338338
args: Tuple[Any, ...],
339339
kwargs: Mapping[str, Any],
340340
) -> Any:
341341
if context_api.get_value(context_api._SUPPRESS_INSTRUMENTATION_KEY):
342-
return wrapped(*args, **kwargs)
342+
return await wrapped(*args, **kwargs)
343343
# Enhanced flow naming - use meaningful flow name instead of generic "Flow.kickoff"
344344
flow_name = _get_flow_name(instance)
345345
span_name = f"{flow_name}.kickoff"
@@ -365,7 +365,7 @@ def __call__(
365365
span.set_attribute("flow_inputs", json.dumps(inputs) if inputs else "")
366366

367367
try:
368-
flow_output = wrapped(*args, **kwargs)
368+
flow_output = await wrapped(*args, **kwargs)
369369
except Exception as exception:
370370
span.set_status(trace_api.Status(trace_api.StatusCode.ERROR, str(exception)))
371371
span.record_exception(exception)

0 commit comments

Comments
 (0)