Skip to content

feat: API for date_bin query #1093

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Jan 16, 2025
24 changes: 14 additions & 10 deletions src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ use url::Url;

use crate::{
oidc::{self, OpenidConfig},
option::{validation, Compression, Mode}, storage::{AzureBlobConfig, FSConfig, S3Config},
option::{validation, Compression, Mode},
storage::{AzureBlobConfig, FSConfig, S3Config},
};

#[cfg(any(
Expand All @@ -43,7 +44,6 @@ use std::string::String as KafkaSslProtocol;
pub const DEFAULT_USERNAME: &str = "admin";
pub const DEFAULT_PASSWORD: &str = "admin";


#[derive(Parser)]
#[command(
name = "parseable",
Expand Down Expand Up @@ -81,11 +81,11 @@ pub struct Cli {
#[derive(Parser)]
pub enum StorageOptions {
#[command(name = "local-store")]
Local(LocalStoreArgs),
Local(LocalStoreArgs),

#[command(name = "s3-store")]
S3(S3StoreArgs),

#[command(name = "blob-store")]
Blob(BlobStoreArgs),
}
Expand Down Expand Up @@ -125,7 +125,7 @@ pub struct Options {

// Server configuration
#[arg(
long,
long,
env = "P_ADDR",
default_value = "0.0.0.0:8000",
value_parser = validation::socket_addr,
Expand Down Expand Up @@ -381,13 +381,13 @@ pub struct Options {
)]
pub audit_logger: Option<Url>,

#[arg(long ,env = "P_AUDIT_USERNAME", help = "Audit logger username")]
#[arg(long, env = "P_AUDIT_USERNAME", help = "Audit logger username")]
pub audit_username: Option<String>,

#[arg(long ,env = "P_AUDIT_PASSWORD", help = "Audit logger password")]
#[arg(long, env = "P_AUDIT_PASSWORD", help = "Audit logger password")]
pub audit_password: Option<String>,

#[arg(long ,env = "P_MS_CLARITY_TAG", help = "Tag for MS Clarity")]
#[arg(long, env = "P_MS_CLARITY_TAG", help = "Tag for MS Clarity")]
pub ms_clarity_tag: Option<String>,
}

Expand All @@ -405,7 +405,11 @@ impl Options {
}

pub fn openid(&self) -> Option<OpenidConfig> {
match (&self.oidc_client_id, &self.oidc_client_secret, &self.oidc_issuer) {
match (
&self.oidc_client_id,
&self.oidc_client_secret,
&self.oidc_issuer,
) {
(Some(id), Some(secret), Some(issuer)) => {
let origin = if let Some(url) = self.domain_address.clone() {
oidc::Origin::Production(url)
Expand Down
1 change: 1 addition & 0 deletions src/handlers/http/modal/query_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ impl ParseableServer for QueryServer {
config
.service(
web::scope(&base_path())
.service(Server::get_date_bin())
.service(Server::get_correlation_webscope())
.service(Server::get_query_factory())
.service(Server::get_liveness_factory())
Expand Down
5 changes: 5 additions & 0 deletions src/handlers/http/modal/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ impl ParseableServer for Server {
.service(Self::get_llm_webscope())
.service(Self::get_oauth_webscope(oidc_client))
.service(Self::get_user_role_webscope())
.service(Self::get_date_bin())
.service(Self::get_metrics_webscope()),
)
.service(Self::get_ingest_otel_factory())
Expand Down Expand Up @@ -266,6 +267,10 @@ impl Server {
),
)
}
pub fn get_date_bin() -> Resource {
web::resource("/datebin")
.route(web::post().to(query::get_date_bin).authorize(Action::Query))
}

// get the query factory
// POST "/query" ==> Get results of the SQL query passed in request body
Expand Down
56 changes: 50 additions & 6 deletions src/handlers/http/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@

use actix_web::http::header::ContentType;
use actix_web::web::{self, Json};
use actix_web::{FromRequest, HttpRequest, Responder};
use actix_web::{FromRequest, HttpRequest, HttpResponse, Responder};
use chrono::{DateTime, Utc};
use datafusion::common::tree_node::TreeNode;
use datafusion::error::DataFusionError;
use datafusion::execution::context::SessionState;
use futures_util::Future;
use http::StatusCode;
use serde::{Deserialize, Serialize};
use serde_json::{json, Value};
use std::collections::HashMap;
use std::pin::Pin;
use std::sync::Arc;
Expand All @@ -39,7 +41,7 @@ use crate::event::commit_schema;
use crate::metrics::QUERY_EXECUTE_TIME;
use crate::option::{Mode, CONFIG};
use crate::query::error::ExecuteError;
use crate::query::Query as LogicalQuery;
use crate::query::{DateBinRequest, DateBinResponse, Query as LogicalQuery};
use crate::query::{TableScanVisitor, QUERY_SESSION};
use crate::rbac::Users;
use crate::response::QueryResponse;
Expand All @@ -52,7 +54,7 @@ use crate::utils::user_auth_for_query;
use super::modal::utils::logstream_utils::create_stream_and_schema_from_storage;

/// Query Request through http endpoint.
#[derive(Debug, serde::Deserialize, serde::Serialize, Clone)]
#[derive(Debug, Deserialize, Serialize, Clone)]
#[serde(rename_all = "camelCase")]
pub struct Query {
pub query: String,
Expand All @@ -66,7 +68,7 @@ pub struct Query {
pub filter_tags: Option<Vec<String>>,
}

pub async fn query(req: HttpRequest, query_request: Query) -> Result<impl Responder, QueryError> {
pub async fn query(req: HttpRequest, query_request: Query) -> Result<HttpResponse, QueryError> {
let session_state = QUERY_SESSION.state();
let raw_logical_plan = match session_state
.create_logical_plan(&query_request.query)
Expand All @@ -81,11 +83,10 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result<impl Respon
.await?
}
};

let time_range =
TimeRange::parse_human_time(&query_request.start_time, &query_request.end_time)?;

// create a visitor to extract the table names present in query
// Create a visitor to extract the table names present in query
let mut visitor = TableScanVisitor::default();
let _ = raw_logical_plan.visit(&mut visitor);

Expand All @@ -103,6 +104,31 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result<impl Respon
user_auth_for_query(&permissions, &tables)?;

let time = Instant::now();
if let Some(column_name) = query.is_logical_plan_count_without_filters() {
let date_bin_request = DateBinRequest {
stream: table_name.clone(),
start_time: query_request.start_time.clone(),
end_time: query_request.end_time.clone(),
num_bins: 1,
};
let date_bin_records = date_bin_request.get_bin_density().await?;
let response = if query_request.fields {
json!({
"fields": vec![&column_name],
"records": vec![json!({column_name: date_bin_records[0].log_count})]
})
} else {
Value::Array(vec![json!({column_name: date_bin_records[0].log_count})])
};

let time = time.elapsed().as_secs_f64();

QUERY_EXECUTE_TIME
.with_label_values(&[&table_name])
.observe(time);

return Ok(HttpResponse::Ok().json(response));
}
let (records, fields) = query.execute(table_name.clone()).await?;

let response = QueryResponse {
Expand All @@ -122,6 +148,24 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result<impl Respon
Ok(response)
}

pub async fn get_date_bin(
req: HttpRequest,
date_bin: Json<DateBinRequest>,
) -> Result<impl Responder, QueryError> {
let creds = extract_session_key_from_req(&req)?;
let permissions = Users.get_permissions(&creds);

// does user have access to table?
user_auth_for_query(&permissions, &[date_bin.stream.clone()])?;

let date_bin_records = date_bin.get_bin_density().await?;

Ok(web::Json(DateBinResponse {
fields: vec!["date_bin_timestamp".into(), "log_count".into()],
records: date_bin_records,
}))
}

pub async fn update_schema_when_distributed(tables: &Vec<String>) -> Result<(), QueryError> {
if CONFIG.options.mode == Mode::Query {
for table in tables {
Expand Down
Loading
Loading