Skip to content

Commit b21e355

Browse files
author
Davidson Gomes
committed
Merge branch 'release/0.0.4'
2 parents 782c2ac + 0c69df1 commit b21e355

File tree

2 files changed

+61
-10
lines changed

2 files changed

+61
-10
lines changed

README.md

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -301,6 +301,51 @@ Authorization: Bearer your-token-jwt
301301
- **LangGraph**: Framework for building stateful, multi-agent workflows
302302
- **ReactFlow**: Library for building node-based visual workflows
303303

304+
## 📊 Langfuse Integration (Tracing & Observability)
305+
306+
Evo AI platform natively supports integration with [Langfuse](https://langfuse.com/) for detailed tracing of agent executions, prompts, model responses, and tool calls, using the OpenTelemetry (OTel) standard.
307+
308+
### Why use Langfuse?
309+
310+
- Visual dashboard for agent traces, prompts, and executions
311+
- Detailed analytics for debugging and evaluating LLM apps
312+
- Easy integration with Google ADK and other frameworks
313+
314+
### How it works
315+
316+
- Every agent execution (including streaming) is automatically traced via OpenTelemetry spans
317+
- Data is sent to Langfuse, where it can be visualized and analyzed
318+
319+
### How to configure
320+
321+
1. **Set environment variables in your `.env`:**
322+
323+
```env
324+
LANGFUSE_PUBLIC_KEY="pk-lf-..." # Your Langfuse public key
325+
LANGFUSE_SECRET_KEY="sk-lf-..." # Your Langfuse secret key
326+
OTEL_EXPORTER_OTLP_ENDPOINT="https://cloud.langfuse.com/api/public/otel" # (or us.cloud... for US region)
327+
```
328+
329+
> **Attention:** Do not swap the keys! `pk-...` is public, `sk-...` is secret.
330+
331+
2. **Automatic initialization**
332+
333+
- Tracing is automatically initialized when the application starts (`src/main.py`).
334+
- Agent execution functions are already instrumented with spans (`src/services/agent_runner.py`).
335+
336+
3. **View in the Langfuse dashboard**
337+
- Access your Langfuse dashboard to see real-time traces.
338+
339+
### Troubleshooting
340+
341+
- **401 Error (Invalid credentials):**
342+
- Check if the keys are correct and not swapped in your `.env`.
343+
- Make sure the endpoint matches your region (EU or US).
344+
- **Context error in async generator:**
345+
- The code is already adjusted to avoid OpenTelemetry context issues in async generators.
346+
- **Questions about integration:**
347+
- See the [official Langfuse documentation - Google ADK](https://langfuse.com/docs/integrations/google-adk)
348+
304349
## 🤖 Agent 2 Agent (A2A) Protocol Support
305350

306351
Evo AI implements the Google's Agent 2 Agent (A2A) protocol, enabling seamless communication and interoperability between AI agents. This implementation includes:

src/services/a2a_task_manager.py

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
from collections.abc import AsyncIterable
44
from typing import Dict, Optional
55
from uuid import UUID
6+
import json
67

78
from sqlalchemy.orm import Session
89

@@ -306,7 +307,7 @@ async def _stream_task_process(
306307
external_id = task_params.sessionId
307308
full_response = ""
308309

309-
# We use the same streaming function used in the WebSocket
310+
# Use the same pattern as chat_routes.py: deserialize each chunk
310311
async for chunk in run_agent_stream(
311312
agent_id=str(agent.id),
312313
external_id=external_id,
@@ -316,9 +317,14 @@ async def _stream_task_process(
316317
memory_service=memory_service,
317318
db=self.db,
318319
):
319-
# Send incremental progress updates
320-
update_text_part = {"type": "text", "text": chunk}
321-
update_message = Message(role="agent", parts=[update_text_part])
320+
try:
321+
chunk_data = json.loads(chunk)
322+
except Exception as e:
323+
logger.warning(f"Invalid chunk received: {chunk} - {e}")
324+
continue
325+
326+
# The chunk_data must be a dict with 'type' and 'text' (or other expected format)
327+
update_message = Message(role="agent", parts=[chunk_data])
322328

323329
# Update the task with each intermediate message
324330
await self.update_store(
@@ -337,24 +343,24 @@ async def _stream_task_process(
337343
final=False,
338344
),
339345
)
340-
full_response += chunk
346+
# If it's text, accumulate for the final response
347+
if chunk_data.get("type") == "text":
348+
full_response += chunk_data.get("text", "")
341349

342-
# Determine the task state
350+
# Determine the final state of the task
343351
task_state = (
344352
TaskState.INPUT_REQUIRED
345353
if "MISSING_INFO:" in full_response
346354
else TaskState.COMPLETED
347355
)
348356

349-
# Create the final response part
357+
# Create the final response
350358
final_text_part = {"type": "text", "text": full_response}
351359
parts = [final_text_part]
352360
final_message = Message(role="agent", parts=parts)
353-
354-
# Create the final artifact from the final response
355361
final_artifact = Artifact(parts=parts, index=0)
356362

357-
# Update the task in the store with the final response
363+
# Update the task with the final response
358364
await self.update_store(
359365
task_params.id,
360366
TaskStatus(state=task_state, message=final_message),

0 commit comments

Comments
 (0)