Skip to content

Commit

Permalink
fix: workflow trace user_id error (langgenius#6932)
Browse files Browse the repository at this point in the history
  • Loading branch information
ZhouhaoJiang authored Aug 3, 2024
1 parent bcd7c8e commit 26e46d3
Show file tree
Hide file tree
Showing 5 changed files with 18 additions and 7 deletions.
3 changes: 2 additions & 1 deletion api/core/app/apps/agent_chat/app_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,8 @@ def generate(self, app_model: App,
)

# get tracing instance
trace_manager = TraceQueueManager(app_model.id)
user_id = user.id if isinstance(user, Account) else user.session_id
trace_manager = TraceQueueManager(app_model.id, user_id)

# init application generate entity
application_generate_entity = AgentChatAppGenerateEntity(
Expand Down
3 changes: 2 additions & 1 deletion api/core/app/apps/workflow/app_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ def generate(
)

# get tracing instance
trace_manager = TraceQueueManager(app_model.id)
user_id = user.id if isinstance(user, Account) else user.session_id
trace_manager = TraceQueueManager(app_model.id, user_id)

# init application generate entity
application_generate_entity = WorkflowAppGenerateEntity(
Expand Down
2 changes: 2 additions & 0 deletions api/core/app/task_pipeline/workflow_cycle_manage.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ def _workflow_run_success(
TraceTaskName.WORKFLOW_TRACE,
workflow_run=workflow_run,
conversation_id=conversation_id,
user_id=trace_manager.user_id,
)
)

Expand Down Expand Up @@ -173,6 +174,7 @@ def _workflow_run_failed(
TraceTaskName.WORKFLOW_TRACE,
workflow_run=workflow_run,
conversation_id=conversation_id,
user_id=trace_manager.user_id,
)
)

Expand Down
5 changes: 3 additions & 2 deletions api/core/ops/langfuse_trace/langfuse_trace.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,13 @@ def trace(self, trace_info: BaseTraceInfo):

def workflow_trace(self, trace_info: WorkflowTraceInfo):
trace_id = trace_info.workflow_app_log_id if trace_info.workflow_app_log_id else trace_info.workflow_run_id
user_id = trace_info.metadata.get("user_id")
if trace_info.message_id:
trace_id = trace_info.message_id
name = f"message_{trace_info.message_id}"
trace_data = LangfuseTrace(
id=trace_info.message_id,
user_id=trace_info.tenant_id,
user_id=user_id,
name=name,
input=trace_info.workflow_run_inputs,
output=trace_info.workflow_run_outputs,
Expand All @@ -95,7 +96,7 @@ def workflow_trace(self, trace_info: WorkflowTraceInfo):
else:
trace_data = LangfuseTrace(
id=trace_id,
user_id=trace_info.tenant_id,
user_id=user_id,
name=f"workflow_{trace_info.workflow_app_log_id}" if trace_info.workflow_app_log_id else f"workflow_{trace_info.workflow_run_id}",
input=trace_info.workflow_run_inputs,
output=trace_info.workflow_run_outputs,
Expand Down
12 changes: 9 additions & 3 deletions api/core/ops/ops_trace_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,13 +271,15 @@ def __init__(
message_id: Optional[str] = None,
workflow_run: Optional[WorkflowRun] = None,
conversation_id: Optional[str] = None,
user_id: Optional[str] = None,
timer: Optional[Any] = None,
**kwargs
):
self.trace_type = trace_type
self.message_id = message_id
self.workflow_run = workflow_run
self.conversation_id = conversation_id
self.user_id = user_id
self.timer = timer
self.kwargs = kwargs
self.file_base_url = os.getenv("FILES_URL", "http://127.0.0.1:5001")
Expand All @@ -290,7 +292,9 @@ def execute(self):
def preprocess(self):
preprocess_map = {
TraceTaskName.CONVERSATION_TRACE: lambda: self.conversation_trace(**self.kwargs),
TraceTaskName.WORKFLOW_TRACE: lambda: self.workflow_trace(self.workflow_run, self.conversation_id),
TraceTaskName.WORKFLOW_TRACE: lambda: self.workflow_trace(
self.workflow_run, self.conversation_id, self.user_id
),
TraceTaskName.MESSAGE_TRACE: lambda: self.message_trace(self.message_id),
TraceTaskName.MODERATION_TRACE: lambda: self.moderation_trace(
self.message_id, self.timer, **self.kwargs
Expand All @@ -313,7 +317,7 @@ def preprocess(self):
def conversation_trace(self, **kwargs):
return kwargs

def workflow_trace(self, workflow_run: WorkflowRun, conversation_id):
def workflow_trace(self, workflow_run: WorkflowRun, conversation_id, user_id):
workflow_id = workflow_run.workflow_id
tenant_id = workflow_run.tenant_id
workflow_run_id = workflow_run.id
Expand Down Expand Up @@ -358,6 +362,7 @@ def workflow_trace(self, workflow_run: WorkflowRun, conversation_id):
"total_tokens": total_tokens,
"file_list": file_list,
"triggered_form": workflow_run.triggered_from,
"user_id": user_id,
}

workflow_trace_info = WorkflowTraceInfo(
Expand Down Expand Up @@ -654,10 +659,11 @@ def generate_name_trace(self, conversation_id, timer, **kwargs):


class TraceQueueManager:
def __init__(self, app_id=None):
def __init__(self, app_id=None, user_id=None):
global trace_manager_timer

self.app_id = app_id
self.user_id = user_id
self.trace_instance = OpsTraceManager.get_ops_trace_instance(app_id)
self.flask_app = current_app._get_current_object()
if trace_manager_timer is None:
Expand Down

0 comments on commit 26e46d3

Please sign in to comment.