This project implements a CrewAI-based workflow system with comprehensive MongoDB persistence for tracking every step of the workflow execution.
- MongoDB Persistence: Every step of the workflow is persisted to MongoDB for audit trails and debugging
- Workflow Tracking: Complete visibility into the execution flow with timestamps and step data
- Error Handling: Comprehensive error tracking and recovery mechanisms
- Chat History: Redis-based chat history management
- RAG Integration: Retrieval-Augmented Generation pipeline for data source queries
- MongoPersistence: Handles all MongoDB operations for workflow state tracking
- AgenticHivemindFlow: CrewAI flow that orchestrates the agent interactions
- run_hivemind_agent_activity: Temporal activity that manages the workflow execution
- QueryDataSources: Handles RAG queries with workflow ID tracking
The system tracks the following steps in MongoDB:
- initialization: Initial workflow setup with parameters
- chat_history_retrieval: Redis chat history retrieval (if applicable)
- no_chat_history: When no chat history is available
- flow_initialization: AgenticHivemindFlow setup
- flow_execution_start: Beginning of CrewAI flow execution
- local_model_classification: Local transformer model classification result
- question_classification: Language model question classification with reasoning
- rag_classification: RAG question classification with score and reasoning
- history_query_classification: History vs RAG query classification (if applicable)
- flow_execution_complete: Completion of CrewAI flow
- answer_processing: Processing of the final answer
- error_handling: Any error handling steps
- memory_update: Redis memory updates (if applicable)
- error_occurred: Any errors during execution
Use the .env.example
to prepare your .env
file.
The system now persists detailed classification reasoning and results for better audit trails and debugging:
- Step Name:
local_model_classification
- Data: Result from local transformer model
- Model:
local_transformer
- Step Name:
question_classification
- Data:
result
: Boolean indicating if the message is a questionreasoning
: Detailed explanation for the classificationmodel
:language_model
query
: Original user query
- Step Name:
rag_classification
- Data:
result
: Boolean indicating if RAG is neededscore
: Sensitivity score (0-1)reasoning
: Detailed explanation for the scoremodel
:language_model
query
: Original user query
- Step Name:
history_query_classification
- Data:
result
: Boolean indicating if it's a history querymodel
:openai_gpt4
query
: Original user queryhasChatHistory
: Boolean indicating if chat history was available
The workflow states are stored in the internal_messages
collection with the following structure:
{
"_id": "ObjectId",
"communityId": "string",
"route": {
"source": "string",
"destination": {
"queue": "string",
"event": "string"
}
},
"question": {
"message": "string",
"filters": "object (optional)"
},
"response": {
"message": "string"
},
"metadata": "object",
"createdAt": "datetime",
"updatedAt": "datetime",
"steps": [
{
"stepName": "string",
"timestamp": "datetime",
"data": "object"
}
],
"currentStep": "string",
"status": "string",
"chatId": "string (optional)",
"enableAnswerSkipping": "boolean"
}
python worker.py
You can query the MongoDB collection to inspect workflow execution:
from tasks.mongo_persistence import MongoPersistence
persistence = MongoPersistence()
workflow_state = persistence.get_workflow_state("workflow_id_here")
print(workflow_state)
Run the unit tests:
python -m pytest tests/unit/test_mongo_persistence.py
pymongo==4.8.0
: MongoDB driverredis==5.2.0
: Redis clientcrewai==0.105.0
: AI agent frameworktemporalio
: Temporal workflow engineopenai==1.66.3
: OpenAI API client
The workflow ID is passed through the entire execution chain:
- Created in
run_hivemind_agent_activity
- Passed to
AgenticHivemindFlow
- Passed to
RAGPipelineTool
- Passed to
QueryDataSources
- Included in
HivemindQueryPayload
for theHivemindWorkflow
This ensures complete traceability from the initial query to the final response.