From de416465a645e03509f3096a0a1cbf5a90b6d8f8 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Mon, 4 Dec 2023 19:21:54 +0800 Subject: [PATCH] feat: support `time()` and related functions in PromQL (#2854) * enhance empty_metric Signed-off-by: Ruihang Xia * implementation Signed-off-by: Ruihang Xia * fix lhs & rhs Signed-off-by: Ruihang Xia * fix clippy Signed-off-by: Ruihang Xia * fix typo, update sqlness Signed-off-by: Ruihang Xia * remove deadcode Signed-off-by: Ruihang Xia * add cast to bool modifier Signed-off-by: Ruihang Xia * update sqlness result Signed-off-by: Ruihang Xia --------- Signed-off-by: Ruihang Xia --- src/promql/src/extension_plan/empty_metric.rs | 94 +++-- src/promql/src/planner.rs | 217 +++++++++-- .../common/promql/set_operation.result | 4 +- .../standalone/common/promql/time_fn.result | 368 ++++++++++++++++++ .../standalone/common/promql/time_fn.sql | 116 ++++++ 5 files changed, 729 insertions(+), 70 deletions(-) create mode 100644 tests/cases/standalone/common/promql/time_fn.result create mode 100644 tests/cases/standalone/common/promql/time_fn.sql diff --git a/src/promql/src/extension_plan/empty_metric.rs b/src/promql/src/extension_plan/empty_metric.rs index 0e81f43d489b..f4c33cf30f63 100644 --- a/src/promql/src/extension_plan/empty_metric.rs +++ b/src/promql/src/extension_plan/empty_metric.rs @@ -19,6 +19,7 @@ use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; +use datafusion::arrow::array::ArrayRef; use datafusion::arrow::datatypes::{DataType, Schema as ArrowSchema, TimeUnit}; use datafusion::common::{DFField, DFSchema, DFSchemaRef, Result as DataFusionResult, Statistics}; use datafusion::error::DataFusionError; @@ -48,7 +49,7 @@ pub struct EmptyMetric { start: Millisecond, end: Millisecond, interval: Millisecond, - expr: Expr, + expr: Option, /// Schema that only contains the time index column. /// This is for intermediate result only. time_index_schema: DFSchemaRef, @@ -63,17 +64,20 @@ impl EmptyMetric { interval: Millisecond, time_index_column_name: String, field_column_name: String, - field_expr: Expr, + field_expr: Option, ) -> DataFusionResult { let ts_only_schema = build_ts_only_schema(&time_index_column_name); - let field_data_type = field_expr.get_type(&ts_only_schema)?; - let schema = Arc::new(DFSchema::new_with_metadata( - vec![ - ts_only_schema.field(0).clone(), - DFField::new(Some(""), &field_column_name, field_data_type, true), - ], - HashMap::new(), - )?); + let mut fields = vec![ts_only_schema.field(0).clone()]; + if let Some(field_expr) = &field_expr { + let field_data_type = field_expr.get_type(&ts_only_schema)?; + fields.push(DFField::new( + Some(""), + &field_column_name, + field_data_type, + true, + )); + } + let schema = Arc::new(DFSchema::new_with_metadata(fields, HashMap::new())?); Ok(Self { start, @@ -94,12 +98,18 @@ impl EmptyMetric { session_state: &SessionState, physical_planner: &dyn PhysicalPlanner, ) -> DataFusionResult> { - let physical_expr = physical_planner.create_physical_expr( - &self.expr, - &self.result_schema, - &ArrowSchema::from(self.result_schema.as_ref()), - session_state, - )?; + let physical_expr = self + .expr + .as_ref() + .map(|expr| { + physical_planner.create_physical_expr( + expr, + &self.result_schema, + &ArrowSchema::from(self.result_schema.as_ref()), + session_state, + ) + }) + .transpose()?; Ok(Arc::new(EmptyMetricExec { start: self.start, @@ -153,7 +163,7 @@ pub struct EmptyMetricExec { time_index_schema: SchemaRef, /// Schema of the output record batch result_schema: SchemaRef, - expr: PhysicalExprRef, + expr: Option, metric: ExecutionPlanMetricsSet, } @@ -241,7 +251,7 @@ pub struct EmptyMetricStream { start: Millisecond, end: Millisecond, interval: Millisecond, - expr: PhysicalExprRef, + expr: Option, /// This stream only generate one record batch at the first poll is_first_poll: bool, /// Schema that only contains the time index column. @@ -272,20 +282,24 @@ impl Stream for EmptyMetricStream { .step_by(self.interval as _) .collect::>(); let time_array = Arc::new(TimestampMillisecondArray::from(time_array)); + let num_rows = time_array.len(); let input_record_batch = RecordBatch::try_new(self.time_index_schema.clone(), vec![time_array.clone()]) .map_err(DataFusionError::ArrowError)?; + let mut result_arrays: Vec = vec![time_array]; // evaluate the field expr and get the result - let field_array = self - .expr - .evaluate(&input_record_batch)? - .into_array(time_array.len()); + if let Some(field_expr) = &self.expr { + result_arrays.push( + field_expr + .evaluate(&input_record_batch)? + .into_array(num_rows), + ); + } // assemble the output record batch - let batch = - RecordBatch::try_new(self.result_schema.clone(), vec![time_array, field_array]) - .map_err(DataFusionError::ArrowError); + let batch = RecordBatch::try_new(self.result_schema.clone(), result_arrays) + .map_err(DataFusionError::ArrowError); Poll::Ready(Some(batch)) } else { @@ -344,7 +358,7 @@ mod test { interval, time_column_name, field_column_name, - time_expr, + Some(time_expr), ) .unwrap(); let empty_metric_exec = empty_metric @@ -455,4 +469,32 @@ mod test { ) .await } + + #[tokio::test] + async fn no_field_expr() { + let session_context = SessionContext::default(); + let df_default_physical_planner = DefaultPhysicalPlanner::default(); + let empty_metric = + EmptyMetric::new(0, 200, 1000, "time".to_string(), "value".to_string(), None).unwrap(); + let empty_metric_exec = empty_metric + .to_execution_plan(&session_context.state(), &df_default_physical_planner) + .unwrap(); + + let result = + datafusion::physical_plan::collect(empty_metric_exec, session_context.task_ctx()) + .await + .unwrap(); + let result_literal = datatypes::arrow::util::pretty::pretty_format_batches(&result) + .unwrap() + .to_string(); + + let expected = String::from( + "+---------------------+\ + \n| time |\ + \n+---------------------+\ + \n| 1970-01-01T00:00:00 |\ + \n+---------------------+", + ); + assert_eq!(result_literal, expected); + } } diff --git a/src/promql/src/planner.rs b/src/promql/src/planner.rs index 7bcc8dabf63f..4ad29d831889 100644 --- a/src/promql/src/planner.rs +++ b/src/promql/src/planner.rs @@ -44,11 +44,10 @@ use table::table::adapter::DfTableProviderAdapter; use crate::error::{ CatalogSnafu, ColumnNotFoundSnafu, CombineTableColumnMismatchSnafu, DataFusionPlanningSnafu, - ExpectExprSnafu, ExpectRangeSelectorSnafu, FunctionInvalidArgumentSnafu, - MultipleMetricMatchersSnafu, MultipleVectorSnafu, NoMetricMatcherSnafu, Result, - TableNameNotFoundSnafu, TimeIndexNotFoundSnafu, UnexpectedPlanExprSnafu, UnexpectedTokenSnafu, - UnknownTableSnafu, UnsupportedExprSnafu, UnsupportedVectorMatchSnafu, ValueNotFoundSnafu, - ZeroRangeSelectorSnafu, + ExpectRangeSelectorSnafu, FunctionInvalidArgumentSnafu, MultipleMetricMatchersSnafu, + MultipleVectorSnafu, NoMetricMatcherSnafu, Result, TableNameNotFoundSnafu, + TimeIndexNotFoundSnafu, UnexpectedPlanExprSnafu, UnexpectedTokenSnafu, UnknownTableSnafu, + UnsupportedExprSnafu, UnsupportedVectorMatchSnafu, ValueNotFoundSnafu, ZeroRangeSelectorSnafu, }; use crate::extension_plan::{ build_special_time_expr, EmptyMetric, HistogramFold, InstantManipulate, Millisecond, @@ -203,7 +202,14 @@ impl PromPlanner { self.ctx.field_columns = vec![DEFAULT_FIELD_COLUMN.to_string()]; self.ctx.table_name = Some(String::new()); let field_expr_builder = Self::prom_token_to_binary_expr_builder(*op)?; - let field_expr = field_expr_builder(lhs, rhs)?; + let mut field_expr = field_expr_builder(lhs, rhs)?; + + if is_comparison_op && should_return_bool { + field_expr = DfExpr::Cast(Cast { + expr: Box::new(field_expr), + data_type: ArrowDataType::Float64, + }); + } LogicalPlan::Extension(Extension { node: Arc::new( @@ -213,15 +219,22 @@ impl PromPlanner { self.ctx.interval, SPECIAL_TIME_FUNCTION.to_string(), DEFAULT_FIELD_COLUMN.to_string(), - field_expr, + Some(field_expr), ) .context(DataFusionPlanningSnafu)?, ), }) } // lhs is a literal, rhs is a column - (Some(expr), None) => { + (Some(mut expr), None) => { let input = self.prom_expr_to_plan(*rhs.clone()).await?; + // check if the literal is a special time expr + if let Some(time_expr) = Self::try_build_special_time_expr( + lhs, + self.ctx.time_index_column.as_ref().unwrap(), + ) { + expr = time_expr + } let bin_expr_builder = |col: &String| { let binary_expr_builder = Self::prom_token_to_binary_expr_builder(*op)?; let mut binary_expr = @@ -242,8 +255,15 @@ impl PromPlanner { } } // lhs is a column, rhs is a literal - (None, Some(expr)) => { + (None, Some(mut expr)) => { let input = self.prom_expr_to_plan(*lhs.clone()).await?; + // check if the literal is a special time expr + if let Some(time_expr) = Self::try_build_special_time_expr( + rhs, + self.ctx.time_index_column.as_ref().unwrap(), + ) { + expr = time_expr + } let bin_expr_builder = |col: &String| { let binary_expr_builder = Self::prom_token_to_binary_expr_builder(*op)?; let mut binary_expr = @@ -353,7 +373,7 @@ impl PromPlanner { self.ctx.interval, SPECIAL_TIME_FUNCTION.to_string(), DEFAULT_FIELD_COLUMN.to_string(), - literal_expr, + Some(literal_expr), ) .context(DataFusionPlanningSnafu)?, ), @@ -373,7 +393,7 @@ impl PromPlanner { self.ctx.interval, SPECIAL_TIME_FUNCTION.to_string(), DEFAULT_FIELD_COLUMN.to_string(), - literal_expr, + Some(literal_expr), ) .context(DataFusionPlanningSnafu)?, ), @@ -443,28 +463,6 @@ impl PromPlanner { }) } PromExpr::Call(Call { func, args }) => { - // TODO(ruihang): refactor this, transform the AST in advance to include an empty metric table. - if func.name == SPECIAL_TIME_FUNCTION { - self.ctx.time_index_column = Some(SPECIAL_TIME_FUNCTION.to_string()); - self.ctx.field_columns = vec![DEFAULT_FIELD_COLUMN.to_string()]; - self.ctx.table_name = Some(String::new()); - let time_expr = build_special_time_expr(SPECIAL_TIME_FUNCTION); - - return Ok(LogicalPlan::Extension(Extension { - node: Arc::new( - EmptyMetric::new( - self.ctx.start, - self.ctx.end, - self.ctx.interval, - SPECIAL_TIME_FUNCTION.to_string(), - DEFAULT_FIELD_COLUMN.to_string(), - time_expr, - ) - .context(DataFusionPlanningSnafu)?, - ), - })); - } - if func.name == SPECIAL_HISTOGRAM_QUANTILE { if args.args.len() != 2 { return FunctionInvalidArgumentSnafu { @@ -481,7 +479,6 @@ impl PromPlanner { let input_plan = self.prom_expr_to_plan(input).await?; if !self.ctx.has_le_tag() { - common_telemetry::info!("[DEBUG] valid tags: {:?}", self.ctx.tag_columns); return ColumnNotFoundSnafu { col: LE_COLUMN_NAME.to_string(), } @@ -518,11 +515,25 @@ impl PromPlanner { } let args = self.create_function_args(&args.args)?; - let input = self - .prom_expr_to_plan(args.input.with_context(|| ExpectExprSnafu { - expr: prom_expr.clone(), - })?) - .await?; + let input = if let Some(prom_expr) = args.input { + self.prom_expr_to_plan(prom_expr).await? + } else { + self.ctx.time_index_column = Some(SPECIAL_TIME_FUNCTION.to_string()); + self.ctx.table_name = Some(String::new()); + LogicalPlan::Extension(Extension { + node: Arc::new( + EmptyMetric::new( + self.ctx.start, + self.ctx.end, + self.ctx.interval, + SPECIAL_TIME_FUNCTION.to_string(), + DEFAULT_FIELD_COLUMN.to_string(), + None, + ) + .context(DataFusionPlanningSnafu)?, + ), + }) + }; let mut func_exprs = self.create_function_expr(func, args.literals)?; func_exprs.insert(0, self.create_time_index_column_expr()?); func_exprs.extend_from_slice(&self.create_tag_column_exprs()?); @@ -968,6 +979,7 @@ impl PromPlanner { // TODO(ruihang): set this according to in-param list let field_column_pos = 0; + let mut exprs = Vec::with_capacity(self.ctx.field_columns.len()); let scalar_func = match func.name { "increase" => ScalarFunc::ExtrapolateUdf(Increase::scalar_udf( self.ctx.range.context(ExpectRangeSelectorSnafu)?, @@ -1033,6 +1045,87 @@ impl PromPlanner { }; ScalarFunc::Udf(HoltWinters::scalar_udf(sf_exp, tf_exp)) } + "time" => { + exprs.push(build_special_time_expr( + self.ctx.time_index_column.as_ref().unwrap(), + )); + ScalarFunc::GeneratedExpr + } + "minute" => { + // date_part('minute', time_index) + let expr = self.date_part_on_time_index("minute")?; + exprs.push(expr); + ScalarFunc::GeneratedExpr + } + "hour" => { + // date_part('hour', time_index) + let expr = self.date_part_on_time_index("hour")?; + exprs.push(expr); + ScalarFunc::GeneratedExpr + } + "month" => { + // date_part('month', time_index) + let expr = self.date_part_on_time_index("month")?; + exprs.push(expr); + ScalarFunc::GeneratedExpr + } + "year" => { + // date_part('year', time_index) + let expr = self.date_part_on_time_index("year")?; + exprs.push(expr); + ScalarFunc::GeneratedExpr + } + "day_of_month" => { + // date_part('day', time_index) + let expr = self.date_part_on_time_index("day")?; + exprs.push(expr); + ScalarFunc::GeneratedExpr + } + "day_of_week" => { + // date_part('dow', time_index) + let expr = self.date_part_on_time_index("dow")?; + exprs.push(expr); + ScalarFunc::GeneratedExpr + } + "day_of_year" => { + // date_part('doy', time_index) + let expr = self.date_part_on_time_index("doy")?; + exprs.push(expr); + ScalarFunc::GeneratedExpr + } + "days_in_month" => { + // date_part( + // 'days', + // (date_trunc('month',