Skip to content

feat: prism post datasets API #1236

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Mar 18, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/handlers/http/modal/query_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ impl ParseableServer for QueryServer {
.service(
web::scope(&prism_base_path())
.service(Server::get_prism_home())
.service(Server::get_prism_logstream()),
.service(Server::get_prism_logstream())
.service(Server::get_prism_datasets()),
)
.service(Server::get_generated());
}
Expand Down
14 changes: 13 additions & 1 deletion src/handlers/http/modal/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,8 @@ impl ParseableServer for Server {
.service(
web::scope(&prism_base_path())
.service(Server::get_prism_home())
.service(Server::get_prism_logstream()),
.service(Server::get_prism_logstream())
.service(Server::get_prism_datasets()),
)
.service(Self::get_ingest_otel_factory())
.service(Self::get_generated());
Expand Down Expand Up @@ -180,6 +181,17 @@ impl Server {
)
}

pub fn get_prism_datasets() -> Scope {
web::scope("/datasets").route(
"",
web::post()
.to(http::prism_logstream::post_datasets)
.authorize_for_stream(Action::GetStreamInfo)
.authorize_for_stream(Action::GetStats)
.authorize_for_stream(Action::GetRetention),
)
}

pub fn get_metrics_webscope() -> Scope {
web::scope("/metrics").service(
web::resource("").route(web::get().to(metrics::get).authorize(Action::Metrics)),
Expand Down
24 changes: 21 additions & 3 deletions src/handlers/http/prism_logstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,33 @@
*/

use actix_web::{
web::{self, Path},
Responder,
web::{self, Json, Path},
HttpRequest, Responder,
};

use crate::prism::logstream::{get_prism_logstream_info, PrismLogstreamError};
use crate::{
prism::logstream::{get_prism_logstream_info, PrismDatasetRequest, PrismLogstreamError},
utils::actix::extract_session_key_from_req,
};

/// This API is essentially just combining the responses of /info and /schema together
pub async fn get_info(stream_name: Path<String>) -> Result<impl Responder, PrismLogstreamError> {
let prism_logstream_info = get_prism_logstream_info(&stream_name).await?;

Ok(web::Json(prism_logstream_info))
}

/// A combination of /stats, /retention, /hottier, /info, /counts and /query
pub async fn post_datasets(
dataset_req: Option<Json<PrismDatasetRequest>>,
req: HttpRequest,
) -> Result<impl Responder, PrismLogstreamError> {
let session_key = extract_session_key_from_req(&req)?;
let dataset = dataset_req
.map(|Json(r)| r)
.unwrap_or_default()
.get_datasets(session_key)
.await?;

Ok(web::Json(dataset))
}
190 changes: 188 additions & 2 deletions src/prism/logstream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ use actix_web::http::header::ContentType;
use arrow_schema::Schema;
use chrono::Utc;
use http::StatusCode;
use serde::Serialize;
use serde::{Deserialize, Serialize};
use serde_json::{json, Value};
use tracing::{debug, warn};

use crate::{
handlers::http::{
Expand All @@ -31,11 +33,18 @@ use crate::{
utils::{merge_quried_stats, IngestionStats, QueriedStats, StorageStats},
},
logstream::error::StreamError,
query::update_schema_when_distributed,
query::{into_query, update_schema_when_distributed, Query, QueryError},
},
hottier::{HotTierError, HotTierManager, StreamHotTier},
parseable::{StreamNotFound, PARSEABLE},
query::{error::ExecuteError, execute, CountsRequest, CountsResponse, QUERY_SESSION},
rbac::{map::SessionKey, role::Action, Users},
stats,
storage::{retention::Retention, StreamInfo, StreamType},
utils::{
arrow::record_batches_to_json,
time::{TimeParseError, TimeRange},
},
LOCK_EXPECT,
};

Expand Down Expand Up @@ -185,6 +194,168 @@ async fn get_stream_info_helper(stream_name: &str) -> Result<StreamInfo, StreamE
Ok(stream_info)
}

/// Response structure for Prism dataset queries.
/// Contains information about a stream, its statistics, retention policy,
/// and query results.
#[derive(Serialize)]
pub struct PrismDatasetResponse {
/// Name of the stream
stream: String,
/// Basic information about the stream
info: StreamInfo,
/// Statistics for the queried timeframe
stats: QueriedStats,
/// Retention policy details
retention: Retention,
/// Hot tier information if available
hottier: Option<StreamHotTier>,
/// Count of records in the specified time range
counts: CountsResponse,
/// Collection of distinct values for source identifiers
distinct_sources: Value,
}

/// Request parameters for retrieving Prism dataset information.
/// Defines which streams to query
#[derive(Deserialize, Default)]
#[serde(rename_all = "camelCase")]
pub struct PrismDatasetRequest {
/// List of stream names to query
#[serde(default)]
streams: Vec<String>,
}

impl PrismDatasetRequest {
/// Retrieves dataset information for all specified streams.
///
/// Processes each stream in the request and compiles their information.
/// Streams that don't exist or can't be accessed are skipped.
///
/// # Returns
/// - `Ok(Vec<PrismDatasetResponse>)`: List of responses for successfully processed streams
/// - `Err(PrismLogstreamError)`: If a critical error occurs during processing
///
/// # Note
/// 1. This method won't fail if individual streams fail - it will only include
/// successfully processed streams in the result.
/// 2. On receiving an empty stream list, we return for all streams the user is able to query for
pub async fn get_datasets(
mut self,
key: SessionKey,
) -> Result<Vec<PrismDatasetResponse>, PrismLogstreamError> {
let is_empty = self.streams.is_empty();
if is_empty {
self.streams = PARSEABLE.streams.list();
}

let mut responses = vec![];
for stream in self.streams.iter() {
if Users.authorize(key.clone(), Action::ListStream, Some(stream), None)
!= crate::rbac::Response::Authorized
{
// Don't warn if listed from Parseable
if !is_empty {
warn!("Unauthorized access requested for stream: {stream}");
}
continue;
}

if PARSEABLE.check_or_load_stream(stream).await {
debug!("Stream not found: {stream}");
continue;
}

let PrismLogstreamInfo {
info,
stats,
retention,
..
} = get_prism_logstream_info(stream).await?;

let hottier = match HotTierManager::global() {
Some(hot_tier_manager) => {
let stats = hot_tier_manager.get_hot_tier(stream).await?;
Some(stats)
}
_ => None,
};
let records = CountsRequest {
stream: stream.clone(),
start_time: "1h".to_owned(),
end_time: "now".to_owned(),
num_bins: 10,
}
.get_bin_density()
.await?;
let counts = CountsResponse {
fields: vec!["start_time".into(), "end_time".into(), "count".into()],
records,
};

// Retrieve distinct values for source identifiers
// Returns None if fields aren't present or if query fails
let ips = self.get_distinct_entries(stream, "p_src_ip").await.ok();
let user_agents = self.get_distinct_entries(stream, "p_user_agent").await.ok();

responses.push(PrismDatasetResponse {
stream: stream.clone(),
info,
stats,
retention,
hottier,
counts,
distinct_sources: json!({
"ips": ips,
"user_agents": user_agents
}),
})
}

Ok(responses)
}

/// Retrieves distinct values for a specific field in a stream.
///
/// # Parameters
/// - `stream_name`: Name of the stream to query
/// - `field`: Field name to get distinct values for
///
/// # Returns
/// - `Ok(Vec<String>)`: List of distinct values found for the field
/// - `Err(QueryError)`: If the query fails or field doesn't exist
async fn get_distinct_entries(
&self,
stream_name: &str,
field: &str,
) -> Result<Vec<String>, QueryError> {
let query = Query {
query: format!("SELECT DISTINCT({field}) FOR {stream_name}"),
start_time: "1h".to_owned(),
end_time: "now".to_owned(),
send_null: false,
filter_tags: None,
fields: true,
};
let time_range = TimeRange::parse_human_time("1h", "now")?;

let session_state = QUERY_SESSION.state();
let query = into_query(&query, &session_state, time_range).await?;
let (records, _) = execute(query, stream_name).await?;
let response = record_batches_to_json(&records)?;
// Extract field values from the JSON response
let values = response
.iter()
.flat_map(|row| {
row.get(field)
.and_then(|s| s.as_str())
.map(|s| s.to_string())
})
.collect();

Ok(values)
}
}

#[derive(Debug, thiserror::Error)]
pub enum PrismLogstreamError {
#[error("Error: {0}")]
Expand All @@ -193,6 +364,16 @@ pub enum PrismLogstreamError {
StreamError(#[from] StreamError),
#[error("StreamNotFound: {0}")]
StreamNotFound(#[from] StreamNotFound),
#[error("Hottier: {0}")]
Hottier(#[from] HotTierError),
#[error("Query: {0}")]
Query(#[from] QueryError),
#[error("TimeParse: {0}")]
TimeParse(#[from] TimeParseError),
#[error("Execute: {0}")]
Execute(#[from] ExecuteError),
#[error("Auth: {0}")]
Auth(#[from] actix_web::Error),
}

impl actix_web::ResponseError for PrismLogstreamError {
Expand All @@ -201,6 +382,11 @@ impl actix_web::ResponseError for PrismLogstreamError {
PrismLogstreamError::Anyhow(_) => StatusCode::INTERNAL_SERVER_ERROR,
PrismLogstreamError::StreamError(e) => e.status_code(),
PrismLogstreamError::StreamNotFound(_) => StatusCode::NOT_FOUND,
PrismLogstreamError::Hottier(_) => StatusCode::INTERNAL_SERVER_ERROR,
PrismLogstreamError::Query(_) => StatusCode::INTERNAL_SERVER_ERROR,
PrismLogstreamError::TimeParse(_) => StatusCode::NOT_FOUND,
PrismLogstreamError::Execute(_) => StatusCode::INTERNAL_SERVER_ERROR,
PrismLogstreamError::Auth(_) => StatusCode::UNAUTHORIZED,
}
}

Expand Down
3 changes: 1 addition & 2 deletions src/response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,7 @@ pub struct QueryResponse {
impl QueryResponse {
pub fn to_http(&self) -> Result<HttpResponse, QueryError> {
info!("{}", "Returning query results");
let records: Vec<&RecordBatch> = self.records.iter().collect();
let mut json_records = record_batches_to_json(&records)?;
let mut json_records = record_batches_to_json(&self.records)?;

if self.fill_null {
for map in &mut json_records {
Expand Down
8 changes: 5 additions & 3 deletions src/utils/arrow/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,12 @@ pub fn replace_columns(
/// * Result<Vec<Map<String, Value>>>
///
/// A vector of JSON objects representing the record batches.
pub fn record_batches_to_json(records: &[&RecordBatch]) -> Result<Vec<Map<String, Value>>> {
pub fn record_batches_to_json(records: &[RecordBatch]) -> Result<Vec<Map<String, Value>>> {
let buf = vec![];
let mut writer = arrow_json::ArrayWriter::new(buf);
writer.write_batches(records)?;
for record in records {
writer.write(record)?;
}
writer.finish()?;

let buf = writer.into_inner();
Expand Down Expand Up @@ -188,7 +190,7 @@ mod tests {
#[test]
fn check_empty_json_to_record_batches() {
let r = RecordBatch::new_empty(Arc::new(Schema::empty()));
let rb = vec![&r];
let rb = vec![r];
let batches = record_batches_to_json(&rb).unwrap();
assert_eq!(batches, vec![]);
}
Expand Down
Loading