Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: add ai metrics #1169

Merged
merged 1 commit into from
Jan 17, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion libs/appflowy-ai-client/src/dto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ pub enum OutputLayout {
SimpleTable = 3,
}

#[derive(Clone, Debug, Default, Serialize_repr, Deserialize_repr)]
#[derive(Clone, Debug, Default, Serialize_repr, Deserialize_repr, Eq, PartialEq)]
#[repr(u8)]
pub enum OutputContent {
#[default]
Expand All @@ -62,6 +62,12 @@ pub enum OutputContent {
RichTextImage = 2,
}

impl OutputContent {
pub fn is_image(&self) -> bool {
*self == OutputContent::IMAGE || *self == OutputContent::RichTextImage
}
}

#[derive(Clone, Default, Debug, Serialize, Deserialize)]
pub struct OutputContentMetadata {
/// Custom prompt for image generation.
Expand Down
58 changes: 37 additions & 21 deletions src/api/chat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,7 @@ async fn answer_stream_handler(
chat::chat_ops::select_chat_message_content(&state.pg_pool, question_id).await?;
let rag_ids = chat::chat_ops::select_chat_rag_ids(&state.pg_pool, &chat_id).await?;
let ai_model = ai_model_from_header(&req);
state.metrics.ai_metrics.record_total_stream_count(1);
match state
.ai_client
.stream_question(
Expand All @@ -291,13 +292,16 @@ async fn answer_stream_handler(
.streaming(new_answer_stream),
)
},
Err(err) => Ok(
HttpResponse::Ok()
.content_type("text/event-stream")
.streaming(stream::once(async move {
Err(AppError::AIServiceUnavailable(err.to_string()))
})),
),
Err(err) => {
state.metrics.ai_metrics.record_failed_stream_count(1);
Ok(
HttpResponse::Ok()
.content_type("text/event-stream")
.streaming(stream::once(async move {
Err(AppError::AIServiceUnavailable(err.to_string()))
})),
)
},
}
}

Expand All @@ -313,6 +317,7 @@ async fn answer_stream_v2_handler(
let rag_ids = chat::chat_ops::select_chat_rag_ids(&state.pg_pool, &chat_id).await?;
let ai_model = ai_model_from_header(&req);

state.metrics.ai_metrics.record_total_stream_count(1);
trace!(
"[Chat] stream answer for chat: {}, question: {}, rag_ids: {:?}",
chat_id,
Expand Down Expand Up @@ -340,13 +345,16 @@ async fn answer_stream_v2_handler(
.streaming(new_answer_stream),
)
},
Err(err) => Ok(
HttpResponse::ServiceUnavailable()
.content_type("text/event-stream")
.streaming(stream::once(async move {
Err(AppError::AIServiceUnavailable(err.to_string()))
})),
),
Err(err) => {
state.metrics.ai_metrics.record_failed_stream_count(1);
Ok(
HttpResponse::ServiceUnavailable()
.content_type("text/event-stream")
.streaming(stream::once(async move {
Err(AppError::AIServiceUnavailable(err.to_string()))
})),
)
},
}
}

Expand All @@ -363,6 +371,10 @@ async fn answer_stream_v3_handler(
chat::chat_ops::select_chat_message_content(&state.pg_pool, payload.question_id).await?;
let rag_ids = chat::chat_ops::select_chat_rag_ids(&state.pg_pool, &payload.chat_id).await?;
let ai_model = ai_model_from_header(&req);
state.metrics.ai_metrics.record_total_stream_count(1);
if payload.format.output_content.is_image() {
state.metrics.ai_metrics.record_stream_image_count(1);
}

let question = ChatQuestion {
chat_id: payload.chat_id,
Expand All @@ -377,6 +389,7 @@ async fn answer_stream_v3_handler(
rag_ids,
},
};

trace!("[Chat] stream v3 {:?}", question);
match state
.ai_client
Expand All @@ -391,13 +404,16 @@ async fn answer_stream_v3_handler(
.streaming(new_answer_stream),
)
},
Err(err) => Ok(
HttpResponse::ServiceUnavailable()
.content_type("text/event-stream")
.streaming(stream::once(async move {
Err(AppError::AIServiceUnavailable(err.to_string()))
})),
),
Err(err) => {
state.metrics.ai_metrics.record_failed_stream_count(1);
Ok(
HttpResponse::ServiceUnavailable()
.content_type("text/event-stream")
.streaming(stream::once(async move {
Err(AppError::AIServiceUnavailable(err.to_string()))
})),
)
},
}
}

Expand Down
46 changes: 46 additions & 0 deletions src/biz/chat/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
use prometheus_client::metrics::counter::Counter;

#[derive(Default, Clone)]
pub struct AIMetrics {
total_stream_count: Counter,
failed_stream_count: Counter,
stream_image_count: Counter,
}

impl AIMetrics {
pub fn register(registry: &mut prometheus_client::registry::Registry) -> Self {
let metrics = Self::default();
let realtime_registry = registry.sub_registry_with_prefix("ai");

// Register each metric with the Prometheus registry
realtime_registry.register(
"total_stream_count",
"Total count of streams processed",
metrics.total_stream_count.clone(),
);
realtime_registry.register(
"failed_stream_count",
"Total count of failed streams",
metrics.failed_stream_count.clone(),
);
realtime_registry.register(
"image_stream_count",
"Total count of image streams processed",
metrics.stream_image_count.clone(),
);

metrics
}

pub fn record_total_stream_count(&self, count: u64) {
self.total_stream_count.inc_by(count);
}

pub fn record_failed_stream_count(&self, count: u64) {
self.failed_stream_count.inc_by(count);
}

pub fn record_stream_image_count(&self, count: u64) {
self.stream_image_count.inc_by(count);
}
}
1 change: 1 addition & 0 deletions src/biz/chat/mod.rs
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
pub mod metrics;
pub mod ops;
4 changes: 4 additions & 0 deletions src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use snowflake::Snowflake;
use tonic_proto::history::history_client::HistoryClient;

use crate::api::metrics::{AppFlowyWebMetrics, PublishedCollabMetrics, RequestMetrics};
use crate::biz::chat::metrics::AIMetrics;
use crate::biz::pg_listener::PgListeners;
use crate::biz::workspace::publish::PublishedCollabStore;
use crate::config::config::Config;
Expand Down Expand Up @@ -130,6 +131,7 @@ pub struct AppMetrics {
pub appflowy_web_metrics: Arc<AppFlowyWebMetrics>,
pub embedding_metrics: Arc<EmbeddingMetrics>,
pub collab_stream_metrics: Arc<CollabStreamMetrics>,
pub ai_metrics: Arc<AIMetrics>,
}

impl Default for AppMetrics {
Expand All @@ -149,6 +151,7 @@ impl AppMetrics {
let appflowy_web_metrics = Arc::new(AppFlowyWebMetrics::register(&mut registry));
let embedding_metrics = Arc::new(EmbeddingMetrics::register(&mut registry));
let collab_stream_metrics = Arc::new(CollabStreamMetrics::register(&mut registry));
let ai_metrics = Arc::new(AIMetrics::register(&mut registry));
Self {
registry: Arc::new(registry),
request_metrics,
Expand All @@ -159,6 +162,7 @@ impl AppMetrics {
appflowy_web_metrics,
embedding_metrics,
collab_stream_metrics,
ai_metrics,
}
}
}
Expand Down
Loading