From 2c3fccb516addcfb082a7c12302b3fffafcbbab8 Mon Sep 17 00:00:00 2001 From: discord9 <55937128+discord9@users.noreply.github.com> Date: Wed, 14 Aug 2024 19:29:30 +0800 Subject: [PATCH] feat(flow): add `eval_batch` for ScalarExpr (#4551) * refactor: better perf flow * feat(WIP): batching proc * feat: UnaryFunc::eval_batch untested * feat: BinaryFunc::eval_batch untested * feat: VariadicFunc::eval_batch un tested * feat: literal eval_batch * refactor: move DfScalarFunc to separate file * chore: remove unused imports * feat: eval_batch df func&ifthen * chore: remove unused file * refactor: use Batch type * chore: remove unused * chore: remove a done TODO * refactor: per review * chore: import * refactor: eval_batch if then * chore: typo --- Cargo.lock | 1 + src/flow/Cargo.toml | 1 + src/flow/src/compute/render.rs | 34 +- src/flow/src/compute/render/map.rs | 5 +- src/flow/src/compute/render/reduce.rs | 13 +- src/flow/src/compute/render/src_sink.rs | 4 +- src/flow/src/compute/state.rs | 2 +- src/flow/src/compute/types.rs | 5 +- src/flow/src/expr.rs | 88 ++++- src/flow/src/expr/df_func.rs | 293 +++++++++++++++ src/flow/src/expr/error.rs | 8 +- src/flow/src/expr/func.rs | 292 ++++++++++++++- src/flow/src/expr/linear.rs | 9 +- src/flow/src/expr/relation.rs | 1 - src/flow/src/expr/relation/accum.rs | 5 +- src/flow/src/expr/relation/func.rs | 7 +- src/flow/src/expr/scalar.rs | 460 +++++++++++++----------- src/flow/src/heartbeat.rs | 11 +- src/flow/src/lib.rs | 1 - src/flow/src/plan.rs | 10 +- src/flow/src/plan/join.rs | 2 - src/flow/src/plan/reduce.rs | 4 +- src/flow/src/repr.rs | 4 - src/flow/src/repr/relation.rs | 4 +- src/flow/src/server.rs | 18 +- src/flow/src/transform.rs | 23 +- src/flow/src/transform/aggr.rs | 47 +-- src/flow/src/transform/literal.rs | 3 +- src/flow/src/transform/plan.rs | 8 +- src/flow/src/utils.rs | 8 +- 30 files changed, 981 insertions(+), 390 deletions(-) create mode 100644 src/flow/src/expr/df_func.rs diff --git a/Cargo.lock b/Cargo.lock index 34da29c6727b..acd6a6bd8b73 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3798,6 +3798,7 @@ name = "flow" version = "0.9.1" dependencies = [ "api", + "arrow", "arrow-schema", "async-recursion", "async-trait", diff --git a/src/flow/Cargo.toml b/src/flow/Cargo.toml index c4db341dbe7d..ed2a1dc1c474 100644 --- a/src/flow/Cargo.toml +++ b/src/flow/Cargo.toml @@ -9,6 +9,7 @@ workspace = true [dependencies] api.workspace = true +arrow.workspace = true arrow-schema.workspace = true async-recursion = "1.0" async-trait.workspace = true diff --git a/src/flow/src/compute/render.rs b/src/flow/src/compute/render.rs index 618f9654257d..444ef7a4ac8c 100644 --- a/src/flow/src/compute/render.rs +++ b/src/flow/src/compute/render.rs @@ -16,32 +16,21 @@ //! //! And the [`Context`] is the environment for the render process, it contains all the necessary information for the render process -use std::cell::RefCell; -use std::collections::{BTreeMap, VecDeque}; -use std::ops::Range; -use std::rc::Rc; - -use datatypes::data_type::ConcreteDataType; -use datatypes::value::{ListValue, Value}; -use hydroflow::futures::SinkExt; -use hydroflow::lattices::cc_traits::Get; +use std::collections::BTreeMap; + use hydroflow::scheduled::graph::Hydroflow; use hydroflow::scheduled::graph_ext::GraphExt; use hydroflow::scheduled::port::{PortCtx, SEND}; use itertools::Itertools; -use snafu::{ensure, OptionExt, ResultExt}; +use snafu::OptionExt; use super::state::Scheduler; use crate::compute::state::DataflowState; -use crate::compute::types::{Arranged, Collection, CollectionBundle, ErrCollector, Toff}; -use crate::error::{Error, EvalSnafu, InvalidQuerySnafu, NotImplementedSnafu, PlanSnafu}; -use crate::expr::error::{DataTypeSnafu, InternalSnafu}; -use crate::expr::{ - self, EvalError, GlobalId, LocalId, MapFilterProject, MfpPlan, SafeMfpPlan, ScalarExpr, -}; -use crate::plan::{AccumulablePlan, KeyValPlan, Plan, ReducePlan, TypedPlan}; -use crate::repr::{self, DiffRow, KeyValDiffRow, Row}; -use crate::utils::{ArrangeHandler, ArrangeReader, ArrangeWriter, Arrangement}; +use crate::compute::types::{Collection, CollectionBundle, ErrCollector, Toff}; +use crate::error::{Error, InvalidQuerySnafu, NotImplementedSnafu}; +use crate::expr::{self, GlobalId, LocalId}; +use crate::plan::{Plan, TypedPlan}; +use crate::repr::{self, DiffRow}; mod map; mod reduce; @@ -218,20 +207,17 @@ mod test { use std::cell::RefCell; use std::rc::Rc; - use common_time::DateTime; - use datatypes::data_type::ConcreteDataType; use hydroflow::scheduled::graph::Hydroflow; use hydroflow::scheduled::graph_ext::GraphExt; use hydroflow::scheduled::handoff::VecHandoff; - use pretty_assertions::{assert_eq, assert_ne}; + use pretty_assertions::assert_eq; use super::*; - use crate::expr::BinaryFunc; use crate::repr::Row; pub fn run_and_check( state: &mut DataflowState, df: &mut Hydroflow, - time_range: Range, + time_range: std::ops::Range, expected: BTreeMap>, output: Rc>>, ) { diff --git a/src/flow/src/compute/render/map.rs b/src/flow/src/compute/render/map.rs index 272be4acc684..c940b34ed144 100644 --- a/src/flow/src/compute/render/map.rs +++ b/src/flow/src/compute/render/map.rs @@ -24,7 +24,7 @@ use crate::compute::state::Scheduler; use crate::compute::types::{Arranged, Collection, CollectionBundle, ErrCollector, Toff}; use crate::error::{Error, PlanSnafu}; use crate::expr::{EvalError, MapFilterProject, MfpPlan, ScalarExpr}; -use crate::plan::{Plan, TypedPlan}; +use crate::plan::TypedPlan; use crate::repr::{self, DiffRow, KeyValDiffRow, Row}; use crate::utils::ArrangeHandler; @@ -206,8 +206,6 @@ fn eval_mfp_core( #[cfg(test)] mod test { - use std::cell::RefCell; - use std::rc::Rc; use datatypes::data_type::ConcreteDataType; use hydroflow::scheduled::graph::Hydroflow; @@ -216,6 +214,7 @@ mod test { use crate::compute::render::test::{get_output_handle, harness_test_ctx, run_and_check}; use crate::compute::state::DataflowState; use crate::expr::{self, BinaryFunc, GlobalId}; + use crate::plan::Plan; use crate::repr::{ColumnType, RelationType}; /// test if temporal filter works properly diff --git a/src/flow/src/compute/render/reduce.rs b/src/flow/src/compute/render/reduce.rs index 5d5761656c84..b41364ec4435 100644 --- a/src/flow/src/compute/render/reduce.rs +++ b/src/flow/src/compute/render/reduce.rs @@ -18,17 +18,15 @@ use std::ops::Range; use datatypes::data_type::ConcreteDataType; use datatypes::value::{ListValue, Value}; use hydroflow::scheduled::graph_ext::GraphExt; -use hydroflow::scheduled::port::{PortCtx, SEND}; use itertools::Itertools; use snafu::{ensure, OptionExt, ResultExt}; use crate::compute::render::{Context, SubgraphArg}; -use crate::compute::state::Scheduler; use crate::compute::types::{Arranged, Collection, CollectionBundle, ErrCollector, Toff}; use crate::error::{Error, PlanSnafu}; use crate::expr::error::{DataAlreadyExpiredSnafu, DataTypeSnafu, InternalSnafu}; -use crate::expr::{AggregateExpr, EvalError, ScalarExpr}; -use crate::plan::{AccumulablePlan, AggrWithIndex, KeyValPlan, Plan, ReducePlan, TypedPlan}; +use crate::expr::{EvalError, ScalarExpr}; +use crate::plan::{AccumulablePlan, AggrWithIndex, KeyValPlan, ReducePlan, TypedPlan}; use crate::repr::{self, DiffRow, KeyValDiffRow, RelationType, Row}; use crate::utils::{ArrangeHandler, ArrangeReader, ArrangeWriter, KeyExpiryManager}; @@ -790,8 +788,6 @@ fn from_val_to_slice_idx( // TODO(discord9): add tests for accum ser/de #[cfg(test)] mod test { - use std::cell::RefCell; - use std::rc::Rc; use common_time::{DateTime, Interval, Timestamp}; use datatypes::data_type::{ConcreteDataType, ConcreteDataType as CDT}; @@ -800,7 +796,10 @@ mod test { use super::*; use crate::compute::render::test::{get_output_handle, harness_test_ctx, run_and_check}; use crate::compute::state::DataflowState; - use crate::expr::{self, AggregateFunc, BinaryFunc, GlobalId, MapFilterProject, UnaryFunc}; + use crate::expr::{ + self, AggregateExpr, AggregateFunc, BinaryFunc, GlobalId, MapFilterProject, UnaryFunc, + }; + use crate::plan::Plan; use crate::repr::{ColumnType, RelationType}; /// SELECT sum(number) FROM numbers_with_ts GROUP BY tumble(ts, '1 second', '2021-07-01 00:00:00') diff --git a/src/flow/src/compute/render/src_sink.rs b/src/flow/src/compute/render/src_sink.rs index c8a8a901c75d..d984f4831191 100644 --- a/src/flow/src/compute/render/src_sink.rs +++ b/src/flow/src/compute/render/src_sink.rs @@ -16,7 +16,7 @@ use std::collections::{BTreeMap, VecDeque}; -use common_telemetry::{debug, info}; +use common_telemetry::debug; use hydroflow::scheduled::graph_ext::GraphExt; use itertools::Itertools; use snafu::OptionExt; @@ -27,7 +27,7 @@ use crate::compute::render::Context; use crate::compute::types::{Arranged, Collection, CollectionBundle, Toff}; use crate::error::{Error, PlanSnafu}; use crate::expr::error::InternalSnafu; -use crate::expr::{EvalError, GlobalId}; +use crate::expr::EvalError; use crate::repr::{DiffRow, Row, BROADCAST_CAP}; #[allow(clippy::mutable_key_type)] diff --git a/src/flow/src/compute/state.rs b/src/flow/src/compute/state.rs index a9356005546c..d34b4a311d15 100644 --- a/src/flow/src/compute/state.rs +++ b/src/flow/src/compute/state.rs @@ -13,7 +13,7 @@ // limitations under the License. use std::cell::RefCell; -use std::collections::{BTreeMap, BTreeSet, VecDeque}; +use std::collections::{BTreeMap, VecDeque}; use std::rc::Rc; use hydroflow::scheduled::graph::Hydroflow; diff --git a/src/flow/src/compute/types.rs b/src/flow/src/compute/types.rs index f2276ba755eb..9674163c2686 100644 --- a/src/flow/src/compute/types.rs +++ b/src/flow/src/compute/types.rs @@ -22,12 +22,11 @@ use hydroflow::scheduled::handoff::TeeingHandoff; use hydroflow::scheduled::port::RecvPort; use hydroflow::scheduled::SubgraphId; use itertools::Itertools; -use tokio::sync::{Mutex, RwLock}; +use tokio::sync::Mutex; -use crate::compute::render::Context; use crate::expr::{EvalError, ScalarExpr}; use crate::repr::DiffRow; -use crate::utils::{ArrangeHandler, Arrangement}; +use crate::utils::ArrangeHandler; pub type Toff = TeeingHandoff; diff --git a/src/flow/src/expr.rs b/src/flow/src/expr.rs index aefc4db3beef..35f937cdc136 100644 --- a/src/flow/src/expr.rs +++ b/src/flow/src/expr.rs @@ -14,6 +14,7 @@ //! for declare Expression in dataflow, including map, reduce, id and join(TODO!) etc. +mod df_func; pub(crate) mod error; mod func; mod id; @@ -22,9 +23,92 @@ mod relation; mod scalar; mod signature; -pub(crate) use error::{EvalError, InvalidArgumentSnafu, OptimizeSnafu}; +use datatypes::prelude::DataType; +use datatypes::vectors::VectorRef; +pub(crate) use df_func::{DfScalarFunction, RawDfScalarFn}; +pub(crate) use error::{EvalError, InvalidArgumentSnafu}; pub(crate) use func::{BinaryFunc, UnaryFunc, UnmaterializableFunc, VariadicFunc}; pub(crate) use id::{GlobalId, Id, LocalId}; +use itertools::Itertools; pub(crate) use linear::{MapFilterProject, MfpPlan, SafeMfpPlan}; pub(crate) use relation::{AggregateExpr, AggregateFunc}; -pub(crate) use scalar::{DfScalarFunction, RawDfScalarFn, ScalarExpr, TypedExpr}; +pub(crate) use scalar::{ScalarExpr, TypedExpr}; +use snafu::{ensure, ResultExt}; + +use crate::expr::error::DataTypeSnafu; + +/// A batch of vectors with the same length but without schema, only useful in dataflow +pub struct Batch { + batch: Vec, + row_count: usize, +} + +impl Batch { + pub fn new(batch: Vec, row_count: usize) -> Self { + Self { batch, row_count } + } + + pub fn batch(&self) -> &[VectorRef] { + &self.batch + } + + pub fn row_count(&self) -> usize { + self.row_count + } + + /// Slices the `Batch`, returning a new `Batch`. + /// + /// # Panics + /// This function panics if `offset + length > self.row_count()`. + pub fn slice(&self, offset: usize, length: usize) -> Batch { + let batch = self + .batch() + .iter() + .map(|v| v.slice(offset, length)) + .collect_vec(); + Batch::new(batch, length) + } + + /// append another batch to self + pub fn append_batch(&mut self, other: Batch) -> Result<(), EvalError> { + ensure!( + self.batch.len() == other.batch.len(), + InvalidArgumentSnafu { + reason: format!( + "Expect two batch to have same numbers of column, found {} and {} columns", + self.batch.len(), + other.batch.len() + ) + } + ); + + let batch_builders = self + .batch + .iter() + .map(|v| { + v.data_type() + .create_mutable_vector(self.row_count() + other.row_count()) + }) + .collect_vec(); + + let mut result = vec![]; + let zelf_row_count = self.row_count(); + let other_row_count = other.row_count(); + for (idx, mut builder) in batch_builders.into_iter().enumerate() { + builder + .extend_slice_of(self.batch()[idx].as_ref(), 0, zelf_row_count) + .context(DataTypeSnafu { + msg: "Failed to extend vector", + })?; + builder + .extend_slice_of(other.batch()[idx].as_ref(), 0, other_row_count) + .context(DataTypeSnafu { + msg: "Failed to extend vector", + })?; + result.push(builder.to_vector()); + } + self.batch = result; + self.row_count = zelf_row_count + other_row_count; + Ok(()) + } +} diff --git a/src/flow/src/expr/df_func.rs b/src/flow/src/expr/df_func.rs new file mode 100644 index 000000000000..b0a2648dd15e --- /dev/null +++ b/src/flow/src/expr/df_func.rs @@ -0,0 +1,293 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Porting Datafusion scalar function to our scalar function to be used in dataflow + +use std::sync::Arc; + +use arrow::array::RecordBatchOptions; +use bytes::BytesMut; +use common_error::ext::BoxedError; +use common_recordbatch::DfRecordBatch; +use common_telemetry::debug; +use datafusion_physical_expr::PhysicalExpr; +use datatypes::data_type::DataType; +use datatypes::value::Value; +use datatypes::vectors::VectorRef; +use prost::Message; +use snafu::{IntoError, ResultExt}; +use substrait::error::{DecodeRelSnafu, EncodeRelSnafu}; +use substrait::substrait_proto_df::proto::expression::ScalarFunction; + +use crate::error::Error; +use crate::expr::error::{ + ArrowSnafu, DatafusionSnafu as EvalDatafusionSnafu, EvalError, ExternalSnafu, + InvalidArgumentSnafu, +}; +use crate::expr::{Batch, ScalarExpr}; +use crate::repr::RelationDesc; +use crate::transform::{from_scalar_fn_to_df_fn_impl, FunctionExtensions}; + +/// A way to represent a scalar function that is implemented in Datafusion +#[derive(Debug, Clone)] +pub struct DfScalarFunction { + /// The raw bytes encoded datafusion scalar function + pub(crate) raw_fn: RawDfScalarFn, + // TODO(discord9): directly from datafusion expr + /// The implementation of the function + pub(crate) fn_impl: Arc, + /// The input schema of the function + pub(crate) df_schema: Arc, +} + +impl DfScalarFunction { + pub fn new(raw_fn: RawDfScalarFn, fn_impl: Arc) -> Result { + Ok(Self { + df_schema: Arc::new(raw_fn.input_schema.to_df_schema()?), + raw_fn, + fn_impl, + }) + } + + pub async fn try_from_raw_fn(raw_fn: RawDfScalarFn) -> Result { + Ok(Self { + fn_impl: raw_fn.get_fn_impl().await?, + df_schema: Arc::new(raw_fn.input_schema.to_df_schema()?), + raw_fn, + }) + } + + /// Evaluate a batch of expressions using input values + pub fn eval_batch(&self, batch: &Batch, exprs: &[ScalarExpr]) -> Result { + let row_count = batch.row_count(); + let batch: Vec<_> = exprs + .iter() + .map(|expr| expr.eval_batch(batch)) + .collect::>()?; + + let schema = self.df_schema.inner().clone(); + + let arrays = batch + .iter() + .map(|array| array.to_arrow_array()) + .collect::>(); + let rb = DfRecordBatch::try_new_with_options(schema, arrays, &RecordBatchOptions::new().with_row_count(Some(row_count))).map_err(|err| { + ArrowSnafu { + context: + "Failed to create RecordBatch from values when eval_batch datafusion scalar function", + } + .into_error(err) + })?; + + let len = rb.num_rows(); + + let res = self.fn_impl.evaluate(&rb).map_err(|err| { + EvalDatafusionSnafu { + raw: err, + context: "Failed to evaluate datafusion scalar function", + } + .build() + })?; + let res = common_query::columnar_value::ColumnarValue::try_from(&res) + .map_err(BoxedError::new) + .context(ExternalSnafu)?; + let res_vec = res + .try_into_vector(len) + .map_err(BoxedError::new) + .context(ExternalSnafu)?; + + Ok(res_vec) + } + + /// eval a list of expressions using input values + fn eval_args(values: &[Value], exprs: &[ScalarExpr]) -> Result, EvalError> { + exprs + .iter() + .map(|expr| expr.eval(values)) + .collect::>() + } + + // TODO(discord9): add RecordBatch support + pub fn eval(&self, values: &[Value], exprs: &[ScalarExpr]) -> Result { + // first eval exprs to construct values to feed to datafusion + let values: Vec<_> = Self::eval_args(values, exprs)?; + if values.is_empty() { + return InvalidArgumentSnafu { + reason: "values is empty".to_string(), + } + .fail(); + } + // TODO(discord9): make cols all array length of one + let mut cols = vec![]; + for (idx, typ) in self + .raw_fn + .input_schema + .typ() + .column_types + .iter() + .enumerate() + { + let typ = typ.scalar_type(); + let mut array = typ.create_mutable_vector(1); + array.push_value_ref(values[idx].as_value_ref()); + cols.push(array.to_vector().to_arrow_array()); + } + let schema = self.df_schema.inner().clone(); + let rb = DfRecordBatch::try_new_with_options( + schema, + cols, + &RecordBatchOptions::new().with_row_count(Some(1)), + ) + .map_err(|err| { + ArrowSnafu { + context: + "Failed to create RecordBatch from values when eval datafusion scalar function", + } + .into_error(err) + })?; + + let res = self.fn_impl.evaluate(&rb).map_err(|err| { + EvalDatafusionSnafu { + raw: err, + context: "Failed to evaluate datafusion scalar function", + } + .build() + })?; + let res = common_query::columnar_value::ColumnarValue::try_from(&res) + .map_err(BoxedError::new) + .context(ExternalSnafu)?; + let res_vec = res + .try_into_vector(1) + .map_err(BoxedError::new) + .context(ExternalSnafu)?; + let res_val = res_vec + .try_get(0) + .map_err(BoxedError::new) + .context(ExternalSnafu)?; + Ok(res_val) + } +} + +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub struct RawDfScalarFn { + /// The raw bytes encoded datafusion scalar function + pub(crate) f: bytes::BytesMut, + /// The input schema of the function + pub(crate) input_schema: RelationDesc, + /// Extension contains mapping from function reference to function name + pub(crate) extensions: FunctionExtensions, +} + +impl RawDfScalarFn { + pub fn from_proto( + f: &substrait::substrait_proto_df::proto::expression::ScalarFunction, + input_schema: RelationDesc, + extensions: FunctionExtensions, + ) -> Result { + let mut buf = BytesMut::new(); + f.encode(&mut buf) + .context(EncodeRelSnafu) + .map_err(BoxedError::new) + .context(crate::error::ExternalSnafu)?; + Ok(Self { + f: buf, + input_schema, + extensions, + }) + } + async fn get_fn_impl(&self) -> Result, Error> { + let f = ScalarFunction::decode(&mut self.f.as_ref()) + .context(DecodeRelSnafu) + .map_err(BoxedError::new) + .context(crate::error::ExternalSnafu)?; + debug!("Decoded scalar function: {:?}", f); + + let input_schema = &self.input_schema; + let extensions = &self.extensions; + + from_scalar_fn_to_df_fn_impl(&f, input_schema, extensions).await + } +} + +impl std::cmp::PartialEq for DfScalarFunction { + fn eq(&self, other: &Self) -> bool { + self.raw_fn.eq(&other.raw_fn) + } +} + +// can't derive Eq because of Arc not eq, so implement it manually +impl std::cmp::Eq for DfScalarFunction {} + +impl std::cmp::PartialOrd for DfScalarFunction { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} +impl std::cmp::Ord for DfScalarFunction { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + self.raw_fn.cmp(&other.raw_fn) + } +} +impl std::hash::Hash for DfScalarFunction { + fn hash(&self, state: &mut H) { + self.raw_fn.hash(state); + } +} + +#[cfg(test)] +mod test { + + use datatypes::prelude::ConcreteDataType; + use substrait::substrait_proto_df::proto::expression::literal::LiteralType; + use substrait::substrait_proto_df::proto::expression::{Literal, RexType}; + use substrait::substrait_proto_df::proto::function_argument::ArgType; + use substrait::substrait_proto_df::proto::{Expression, FunctionArgument}; + + use super::*; + use crate::repr::{ColumnType, RelationType}; + + #[tokio::test] + async fn test_df_scalar_function() { + let raw_scalar_func = ScalarFunction { + function_reference: 0, + arguments: vec![FunctionArgument { + arg_type: Some(ArgType::Value(Expression { + rex_type: Some(RexType::Literal(Literal { + nullable: false, + type_variation_reference: 0, + literal_type: Some(LiteralType::I64(-1)), + })), + })), + }], + output_type: None, + ..Default::default() + }; + let input_schema = RelationDesc::try_new( + RelationType::new(vec![ColumnType::new_nullable( + ConcreteDataType::null_datatype(), + )]), + vec!["null_column".to_string()], + ) + .unwrap(); + let extensions = FunctionExtensions::from_iter(vec![(0, "abs")]); + let raw_fn = RawDfScalarFn::from_proto(&raw_scalar_func, input_schema, extensions).unwrap(); + let df_func = DfScalarFunction::try_from_raw_fn(raw_fn).await.unwrap(); + assert_eq!( + df_func + .eval(&[Value::Null], &[ScalarExpr::Column(0)]) + .unwrap(), + Value::Int64(1) + ); + } +} diff --git a/src/flow/src/expr/error.rs b/src/flow/src/expr/error.rs index ff1765df49fd..6703ce240471 100644 --- a/src/flow/src/expr/error.rs +++ b/src/flow/src/expr/error.rs @@ -14,17 +14,12 @@ //! Error handling for expression evaluation. -use std::any::Any; - use arrow_schema::ArrowError; use common_error::ext::BoxedError; use common_macro::stack_trace_debug; -use common_telemetry::common_error::ext::ErrorExt; -use common_telemetry::common_error::status_code::StatusCode; use datafusion_common::DataFusionError; use datatypes::data_type::ConcreteDataType; -use serde::{Deserialize, Serialize}; -use snafu::{Location, ResultExt, Snafu}; +use snafu::{Location, Snafu}; fn is_send_sync() { fn check() {} @@ -113,6 +108,7 @@ pub enum EvalError { #[snafu(display("Arrow error: {raw:?}, context: {context}"))] Arrow { + #[snafu(source)] raw: ArrowError, context: String, #[snafu(implicit)] diff --git a/src/flow/src/expr/func.rs b/src/flow/src/expr/func.rs index ba8cdba71c70..143f1a82dda3 100644 --- a/src/flow/src/expr/func.rs +++ b/src/flow/src/expr/func.rs @@ -15,17 +15,20 @@ //! This module contains the definition of functions that can be used in expressions. use std::collections::HashMap; -use std::sync::OnceLock; +use std::sync::{Arc, OnceLock}; +use arrow::array::{ArrayRef, BooleanArray}; use common_error::ext::BoxedError; -use common_telemetry::debug; use common_time::timestamp::TimeUnit; use common_time::{DateTime, Timestamp}; use datafusion_expr::Operator; use datatypes::data_type::ConcreteDataType; +use datatypes::prelude::DataType; use datatypes::types::cast; -use datatypes::types::cast::CastOption; use datatypes::value::Value; +use datatypes::vectors::{ + BooleanVector, DateTimeVector, Helper, TimestampMillisecondVector, VectorRef, +}; use serde::{Deserialize, Serialize}; use smallvec::smallvec; use snafu::{ensure, OptionExt, ResultExt}; @@ -34,12 +37,12 @@ use substrait::df_logical_plan::consumer::name_to_op; use crate::error::{Error, ExternalSnafu, InvalidQuerySnafu, PlanSnafu}; use crate::expr::error::{ - CastValueSnafu, DivisionByZeroSnafu, EvalError, InternalSnafu, OverflowSnafu, + ArrowSnafu, CastValueSnafu, DataTypeSnafu, DivisionByZeroSnafu, EvalError, OverflowSnafu, TryFromValueSnafu, TypeMismatchSnafu, }; use crate::expr::signature::{GenericFn, Signature}; -use crate::expr::{InvalidArgumentSnafu, ScalarExpr, TypedExpr}; -use crate::repr::{self, value_to_internal_ts, Row}; +use crate::expr::{Batch, InvalidArgumentSnafu, ScalarExpr, TypedExpr}; +use crate::repr::{self, value_to_internal_ts}; /// UnmaterializableFunc is a function that can't be eval independently, /// and require special handling @@ -221,6 +224,129 @@ impl UnaryFunc { } } + pub fn eval_batch(&self, batch: &Batch, expr: &ScalarExpr) -> Result { + let arg_col = expr.eval_batch(batch)?; + match self { + Self::Not => { + let arrow_array = arg_col.to_arrow_array(); + let bool_array = arrow_array + .as_any() + .downcast_ref::() + .context({ + TypeMismatchSnafu { + expected: ConcreteDataType::boolean_datatype(), + actual: arg_col.data_type(), + } + })?; + let ret = arrow::compute::not(bool_array).context(ArrowSnafu { context: "not" })?; + let ret = BooleanVector::from(ret); + Ok(Arc::new(ret)) + } + Self::IsNull => { + let arrow_array = arg_col.to_arrow_array(); + let ret = arrow::compute::is_null(&arrow_array) + .context(ArrowSnafu { context: "is_null" })?; + let ret = BooleanVector::from(ret); + Ok(Arc::new(ret)) + } + Self::IsTrue | Self::IsFalse => { + let arrow_array = arg_col.to_arrow_array(); + let bool_array = arrow_array + .as_any() + .downcast_ref::() + .context({ + TypeMismatchSnafu { + expected: ConcreteDataType::boolean_datatype(), + actual: arg_col.data_type(), + } + })?; + + if matches!(self, Self::IsTrue) { + Ok(Arc::new(BooleanVector::from(bool_array.clone()))) + } else { + let ret = + arrow::compute::not(bool_array).context(ArrowSnafu { context: "not" })?; + Ok(Arc::new(BooleanVector::from(ret))) + } + } + Self::StepTimestamp => { + let datetime_array = get_datetime_array(&arg_col)?; + let date_array_ref = datetime_array + .as_any() + .downcast_ref::() + .context({ + TypeMismatchSnafu { + expected: ConcreteDataType::boolean_datatype(), + actual: ConcreteDataType::from_arrow_type(datetime_array.data_type()), + } + })?; + + let ret = arrow::compute::unary(date_array_ref, |arr| arr + 1); + let ret = DateTimeVector::from(ret); + Ok(Arc::new(ret)) + } + Self::Cast(to) => { + let arrow_array = arg_col.to_arrow_array(); + let ret = arrow::compute::cast(&arrow_array, &to.as_arrow_type()) + .context(ArrowSnafu { context: "cast" })?; + let vector = Helper::try_into_vector(ret).context(DataTypeSnafu { + msg: "Fail to convert to Vector", + })?; + Ok(vector) + } + Self::TumbleWindowFloor { + window_size, + start_time, + } => { + let datetime_array = get_datetime_array(&arg_col)?; + let date_array_ref = datetime_array + .as_any() + .downcast_ref::() + .context({ + TypeMismatchSnafu { + expected: ConcreteDataType::boolean_datatype(), + actual: ConcreteDataType::from_arrow_type(datetime_array.data_type()), + } + })?; + + let start_time = start_time.map(|t| t.val()); + let window_size = (window_size.to_nanosecond() / 1_000_000) as repr::Duration; // nanosecond to millisecond + + let ret = arrow::compute::unary(date_array_ref, |ts| { + get_window_start(ts, window_size, start_time) + }); + + let ret = TimestampMillisecondVector::from(ret); + Ok(Arc::new(ret)) + } + Self::TumbleWindowCeiling { + window_size, + start_time, + } => { + let datetime_array = get_datetime_array(&arg_col)?; + let date_array_ref = datetime_array + .as_any() + .downcast_ref::() + .context({ + TypeMismatchSnafu { + expected: ConcreteDataType::boolean_datatype(), + actual: ConcreteDataType::from_arrow_type(datetime_array.data_type()), + } + })?; + + let start_time = start_time.map(|t| t.val()); + let window_size = (window_size.to_nanosecond() / 1_000_000) as repr::Duration; // nanosecond to millisecond + + let ret = arrow::compute::unary(date_array_ref, |ts| { + get_window_start(ts, window_size, start_time) + window_size + }); + + let ret = TimestampMillisecondVector::from(ret); + Ok(Arc::new(ret)) + } + } + } + /// Evaluate the function with given values and expression /// /// # Arguments @@ -314,6 +440,23 @@ impl UnaryFunc { } } +fn get_datetime_array(vector: &VectorRef) -> Result { + let arrow_array = vector.to_arrow_array(); + let datetime_array = + if *arrow_array.data_type() == ConcreteDataType::datetime_datatype().as_arrow_type() { + arrow_array + } else { + arrow::compute::cast( + &arrow_array, + &ConcreteDataType::datetime_datatype().as_arrow_type(), + ) + .context(ArrowSnafu { + context: "Trying to cast to datetime in StepTimestamp", + })? + }; + Ok(datetime_array) +} + fn get_window_start( ts: repr::Timestamp, window_size: repr::Duration, @@ -692,6 +835,98 @@ impl BinaryFunc { Ok((spec_fn, signature)) } + pub fn eval_batch( + &self, + batch: &Batch, + expr1: &ScalarExpr, + expr2: &ScalarExpr, + ) -> Result { + let left = expr1.eval_batch(batch)?; + let left = left.to_arrow_array(); + let right = expr2.eval_batch(batch)?; + let right = right.to_arrow_array(); + + let arrow_array: ArrayRef = match self { + Self::Eq => Arc::new( + arrow::compute::kernels::cmp::eq(&left, &right) + .context(ArrowSnafu { context: "eq" })?, + ), + Self::NotEq => Arc::new( + arrow::compute::kernels::cmp::neq(&left, &right) + .context(ArrowSnafu { context: "neq" })?, + ), + Self::Lt => Arc::new( + arrow::compute::kernels::cmp::lt(&left, &right) + .context(ArrowSnafu { context: "lt" })?, + ), + Self::Lte => Arc::new( + arrow::compute::kernels::cmp::lt_eq(&left, &right) + .context(ArrowSnafu { context: "lte" })?, + ), + Self::Gt => Arc::new( + arrow::compute::kernels::cmp::gt(&left, &right) + .context(ArrowSnafu { context: "gt" })?, + ), + Self::Gte => Arc::new( + arrow::compute::kernels::cmp::gt_eq(&left, &right) + .context(ArrowSnafu { context: "gte" })?, + ), + + Self::AddInt16 + | Self::AddInt32 + | Self::AddInt64 + | Self::AddUInt16 + | Self::AddUInt32 + | Self::AddUInt64 + | Self::AddFloat32 + | Self::AddFloat64 => arrow::compute::kernels::numeric::add(&left, &right) + .context(ArrowSnafu { context: "add" })?, + + Self::SubInt16 + | Self::SubInt32 + | Self::SubInt64 + | Self::SubUInt16 + | Self::SubUInt32 + | Self::SubUInt64 + | Self::SubFloat32 + | Self::SubFloat64 => arrow::compute::kernels::numeric::sub(&left, &right) + .context(ArrowSnafu { context: "sub" })?, + + Self::MulInt16 + | Self::MulInt32 + | Self::MulInt64 + | Self::MulUInt16 + | Self::MulUInt32 + | Self::MulUInt64 + | Self::MulFloat32 + | Self::MulFloat64 => arrow::compute::kernels::numeric::mul(&left, &right) + .context(ArrowSnafu { context: "mul" })?, + + Self::DivInt16 + | Self::DivInt32 + | Self::DivInt64 + | Self::DivUInt16 + | Self::DivUInt32 + | Self::DivUInt64 + | Self::DivFloat32 + | Self::DivFloat64 => arrow::compute::kernels::numeric::mul(&left, &right) + .context(ArrowSnafu { context: "div" })?, + + Self::ModInt16 + | Self::ModInt32 + | Self::ModInt64 + | Self::ModUInt16 + | Self::ModUInt32 + | Self::ModUInt64 => arrow::compute::kernels::numeric::rem(&left, &right) + .context(ArrowSnafu { context: "rem" })?, + }; + + let vector = Helper::try_into_vector(arrow_array).context(DataTypeSnafu { + msg: "Fail to convert to Vector", + })?; + Ok(vector) + } + /// Evaluate the function with given values and expression /// /// # Arguments @@ -824,6 +1059,51 @@ impl VariadicFunc { } } + pub fn eval_batch(&self, batch: &Batch, exprs: &[ScalarExpr]) -> Result { + ensure!( + !exprs.is_empty(), + InvalidArgumentSnafu { + reason: format!("Variadic function {:?} requires at least 1 arguments", self) + } + ); + let args = exprs + .iter() + .map(|expr| expr.eval_batch(batch).map(|v| v.to_arrow_array())) + .collect::, _>>()?; + let mut iter = args.into_iter(); + + let first = iter.next().unwrap(); + let mut left = first + .as_any() + .downcast_ref::() + .context({ + TypeMismatchSnafu { + expected: ConcreteDataType::boolean_datatype(), + actual: ConcreteDataType::from_arrow_type(first.data_type()), + } + })? + .clone(); + + for right in iter { + let right = right.as_any().downcast_ref::().context({ + TypeMismatchSnafu { + expected: ConcreteDataType::boolean_datatype(), + actual: ConcreteDataType::from_arrow_type(right.data_type()), + } + })?; + left = match self { + Self::And => { + arrow::compute::and(&left, right).context(ArrowSnafu { context: "and" })? + } + Self::Or => { + arrow::compute::or(&left, right).context(ArrowSnafu { context: "or" })? + } + } + } + + Ok(Arc::new(BooleanVector::from(left))) + } + /// Evaluate the function with given values and expressions pub fn eval(&self, values: &[Value], exprs: &[ScalarExpr]) -> Result { match self { diff --git a/src/flow/src/expr/linear.rs b/src/flow/src/expr/linear.rs index b61ff944daa2..234ae12cef14 100644 --- a/src/flow/src/expr/linear.rs +++ b/src/flow/src/expr/linear.rs @@ -14,17 +14,15 @@ //! define MapFilterProject which is a compound operator that can be applied row-by-row. -use std::collections::{BTreeMap, BTreeSet, VecDeque}; +use std::collections::{BTreeMap, BTreeSet}; use common_telemetry::debug; use datatypes::value::Value; -use itertools::Itertools; -use serde::{Deserialize, Serialize}; -use snafu::{ensure, OptionExt}; +use snafu::ensure; use crate::error::{Error, InvalidQuerySnafu}; use crate::expr::error::{EvalError, InternalSnafu}; -use crate::expr::{Id, InvalidArgumentSnafu, LocalId, ScalarExpr}; +use crate::expr::{InvalidArgumentSnafu, ScalarExpr}; use crate::repr::{self, value_to_internal_ts, Diff, Row}; /// A compound operator that can be applied row-by-row. @@ -738,7 +736,6 @@ impl MfpPlan { #[cfg(test)] mod test { use datatypes::data_type::ConcreteDataType; - use itertools::Itertools; use super::*; use crate::expr::{BinaryFunc, UnaryFunc, UnmaterializableFunc}; diff --git a/src/flow/src/expr/relation.rs b/src/flow/src/expr/relation.rs index 661f716dcd29..3661db4ff0f2 100644 --- a/src/flow/src/expr/relation.rs +++ b/src/flow/src/expr/relation.rs @@ -15,7 +15,6 @@ //! Describes an aggregation function and it's input expression. pub(crate) use func::AggregateFunc; -use serde::{Deserialize, Serialize}; use crate::expr::ScalarExpr; diff --git a/src/flow/src/expr/relation/accum.rs b/src/flow/src/expr/relation/accum.rs index c9affae7601d..252913de56f6 100644 --- a/src/flow/src/expr/relation/accum.rs +++ b/src/flow/src/expr/relation/accum.rs @@ -24,11 +24,9 @@ use std::any::type_name; use std::fmt::Display; use common_decimal::Decimal128; -use common_time::{Date, DateTime}; use datatypes::data_type::ConcreteDataType; use datatypes::value::{OrderedF32, OrderedF64, OrderedFloat, Value}; use enum_dispatch::enum_dispatch; -use hydroflow::futures::stream::Concat; use serde::{Deserialize, Serialize}; use snafu::ensure; @@ -761,7 +759,10 @@ fn ty_eq_without_precision(left: ConcreteDataType, right: ConcreteDataType) -> b #[allow(clippy::too_many_lines)] #[cfg(test)] mod test { + use common_time::DateTime; + use super::*; + #[test] fn test_accum() { let testcases = vec![ diff --git a/src/flow/src/expr/relation/func.rs b/src/flow/src/expr/relation/func.rs index 5307a6aedb3a..868d83b43f02 100644 --- a/src/flow/src/expr/relation/func.rs +++ b/src/flow/src/expr/relation/func.rs @@ -16,16 +16,15 @@ use std::collections::HashMap; use std::str::FromStr; use std::sync::OnceLock; -use common_time::{Date, DateTime}; use datatypes::prelude::ConcreteDataType; -use datatypes::value::{OrderedF32, OrderedF64, Value}; +use datatypes::value::Value; use serde::{Deserialize, Serialize}; use smallvec::smallvec; -use snafu::{IntoError, OptionExt, ResultExt}; +use snafu::{IntoError, OptionExt}; use strum::{EnumIter, IntoEnumIterator}; use crate::error::{DatafusionSnafu, Error, InvalidQuerySnafu}; -use crate::expr::error::{EvalError, TryFromValueSnafu, TypeMismatchSnafu}; +use crate::expr::error::EvalError; use crate::expr::relation::accum::{Accum, Accumulator}; use crate::expr::signature::{GenericFn, Signature}; use crate::repr::Diff; diff --git a/src/flow/src/expr/scalar.rs b/src/flow/src/expr/scalar.rs index 8a3290a932f1..b582c75114a1 100644 --- a/src/flow/src/expr/scalar.rs +++ b/src/flow/src/expr/scalar.rs @@ -15,34 +15,22 @@ //! Scalar expressions. use std::collections::{BTreeMap, BTreeSet}; -use std::sync::{Arc, Mutex}; -use bytes::BytesMut; use common_error::ext::BoxedError; -use common_recordbatch::DfRecordBatch; -use common_telemetry::debug; -use datafusion_physical_expr::PhysicalExpr; -use datatypes::data_type::DataType; -use datatypes::prelude::ConcreteDataType; +use datatypes::prelude::{ConcreteDataType, DataType}; use datatypes::value::Value; -use datatypes::{arrow_array, value}; -use prost::Message; -use serde::{Deserialize, Serialize}; -use snafu::{ensure, ResultExt}; -use substrait::error::{DecodeRelSnafu, EncodeRelSnafu}; -use substrait::substrait_proto_df::proto::expression::{RexType, ScalarFunction}; -use substrait::substrait_proto_df::proto::Expression; +use datatypes::vectors::{BooleanVector, Helper, NullVector, Vector, VectorRef}; +use snafu::{ensure, OptionExt, ResultExt}; use crate::error::{ DatafusionSnafu, Error, InvalidQuerySnafu, UnexpectedSnafu, UnsupportedTemporalFilterSnafu, }; use crate::expr::error::{ - ArrowSnafu, DatafusionSnafu as EvalDatafusionSnafu, EvalError, ExternalSnafu, - InvalidArgumentSnafu, OptimizeSnafu, + DataTypeSnafu, EvalError, InternalSnafu, InvalidArgumentSnafu, OptimizeSnafu, TypeMismatchSnafu, }; use crate::expr::func::{BinaryFunc, UnaryFunc, UnmaterializableFunc, VariadicFunc}; -use crate::repr::{ColumnType, RelationDesc, RelationType}; -use crate::transform::{from_scalar_fn_to_df_fn_impl, FunctionExtensions}; +use crate::expr::{Batch, DfScalarFunction}; +use crate::repr::{ColumnType, RelationType}; /// A scalar expression with a known type. #[derive(Ord, PartialOrd, Clone, Debug, Eq, PartialEq, Hash)] pub struct TypedExpr { @@ -174,163 +162,6 @@ pub enum ScalarExpr { }, } -/// A way to represent a scalar function that is implemented in Datafusion -#[derive(Debug, Clone)] -pub struct DfScalarFunction { - raw_fn: RawDfScalarFn, - // TODO(discord9): directly from datafusion expr - fn_impl: Arc, - df_schema: Arc, -} - -impl DfScalarFunction { - pub fn new(raw_fn: RawDfScalarFn, fn_impl: Arc) -> Result { - Ok(Self { - df_schema: Arc::new(raw_fn.input_schema.to_df_schema()?), - raw_fn, - fn_impl, - }) - } - - pub async fn try_from_raw_fn(raw_fn: RawDfScalarFn) -> Result { - Ok(Self { - fn_impl: raw_fn.get_fn_impl().await?, - df_schema: Arc::new(raw_fn.input_schema.to_df_schema()?), - raw_fn, - }) - } - - /// eval a list of expressions using input values - fn eval_args(values: &[Value], exprs: &[ScalarExpr]) -> Result, EvalError> { - exprs - .iter() - .map(|expr| expr.eval(values)) - .collect::>() - } - - // TODO(discord9): add RecordBatch support - pub fn eval(&self, values: &[Value], exprs: &[ScalarExpr]) -> Result { - // first eval exprs to construct values to feed to datafusion - let values: Vec<_> = Self::eval_args(values, exprs)?; - if values.is_empty() { - return InvalidArgumentSnafu { - reason: "values is empty".to_string(), - } - .fail(); - } - // TODO(discord9): make cols all array length of one - let mut cols = vec![]; - for (idx, typ) in self - .raw_fn - .input_schema - .typ() - .column_types - .iter() - .enumerate() - { - let typ = typ.scalar_type(); - let mut array = typ.create_mutable_vector(1); - array.push_value_ref(values[idx].as_value_ref()); - cols.push(array.to_vector().to_arrow_array()); - } - let schema = self.df_schema.inner().clone(); - let rb = DfRecordBatch::try_new(schema, cols).map_err(|err| { - ArrowSnafu { - raw: err, - context: - "Failed to create RecordBatch from values when eval datafusion scalar function", - } - .build() - })?; - - let res = self.fn_impl.evaluate(&rb).map_err(|err| { - EvalDatafusionSnafu { - raw: err, - context: "Failed to evaluate datafusion scalar function", - } - .build() - })?; - let res = common_query::columnar_value::ColumnarValue::try_from(&res) - .map_err(BoxedError::new) - .context(ExternalSnafu)?; - let res_vec = res - .try_into_vector(1) - .map_err(BoxedError::new) - .context(ExternalSnafu)?; - let res_val = res_vec - .try_get(0) - .map_err(BoxedError::new) - .context(ExternalSnafu)?; - Ok(res_val) - } -} - -#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] -pub struct RawDfScalarFn { - /// The raw bytes encoded datafusion scalar function - pub(crate) f: bytes::BytesMut, - /// The input schema of the function - pub(crate) input_schema: RelationDesc, - /// Extension contains mapping from function reference to function name - pub(crate) extensions: FunctionExtensions, -} - -impl RawDfScalarFn { - pub fn from_proto( - f: &substrait::substrait_proto_df::proto::expression::ScalarFunction, - input_schema: RelationDesc, - extensions: FunctionExtensions, - ) -> Result { - let mut buf = BytesMut::new(); - f.encode(&mut buf) - .context(EncodeRelSnafu) - .map_err(BoxedError::new) - .context(crate::error::ExternalSnafu)?; - Ok(Self { - f: buf, - input_schema, - extensions, - }) - } - async fn get_fn_impl(&self) -> Result, Error> { - let f = ScalarFunction::decode(&mut self.f.as_ref()) - .context(DecodeRelSnafu) - .map_err(BoxedError::new) - .context(crate::error::ExternalSnafu)?; - debug!("Decoded scalar function: {:?}", f); - - let input_schema = &self.input_schema; - let extensions = &self.extensions; - - from_scalar_fn_to_df_fn_impl(&f, input_schema, extensions).await - } -} - -impl std::cmp::PartialEq for DfScalarFunction { - fn eq(&self, other: &Self) -> bool { - self.raw_fn.eq(&other.raw_fn) - } -} - -// can't derive Eq because of Arc not eq, so implement it manually -impl std::cmp::Eq for DfScalarFunction {} - -impl std::cmp::PartialOrd for DfScalarFunction { - fn partial_cmp(&self, other: &Self) -> Option { - Some(self.cmp(other)) - } -} -impl std::cmp::Ord for DfScalarFunction { - fn cmp(&self, other: &Self) -> std::cmp::Ordering { - self.raw_fn.cmp(&other.raw_fn) - } -} -impl std::hash::Hash for DfScalarFunction { - fn hash(&self, state: &mut H) { - self.raw_fn.hash(state); - } -} - impl ScalarExpr { pub fn with_type(self, typ: ColumnType) -> TypedExpr { TypedExpr::new(self, typ) @@ -428,6 +259,177 @@ impl ScalarExpr { } } + pub fn eval_batch(&self, batch: &Batch) -> Result { + match self { + ScalarExpr::Column(i) => Ok(batch.batch()[*i].clone()), + ScalarExpr::Literal(val, dt) => Ok(Helper::try_from_scalar_value( + val.try_to_scalar_value(dt).context(DataTypeSnafu { + msg: "Failed to convert literal to scalar value", + })?, + batch.row_count(), + ) + .context(DataTypeSnafu { + msg: "Failed to convert scalar value to vector ref when parsing literal", + })?), + ScalarExpr::CallUnmaterializable(_) => OptimizeSnafu { + reason: "Can't eval unmaterializable function", + } + .fail()?, + ScalarExpr::CallUnary { func, expr } => func.eval_batch(batch, expr), + ScalarExpr::CallBinary { func, expr1, expr2 } => func.eval_batch(batch, expr1, expr2), + ScalarExpr::CallVariadic { func, exprs } => func.eval_batch(batch, exprs), + ScalarExpr::CallDf { + df_scalar_fn, + exprs, + } => df_scalar_fn.eval_batch(batch, exprs), + ScalarExpr::If { cond, then, els } => Self::eval_if_then(batch, cond, then, els), + } + } + + fn eval_if_then( + batch: &Batch, + cond: &ScalarExpr, + then: &ScalarExpr, + els: &ScalarExpr, + ) -> Result { + let conds = cond.eval_batch(batch)?; + let bool_conds = conds + .as_any() + .downcast_ref::() + .context({ + TypeMismatchSnafu { + expected: ConcreteDataType::boolean_datatype(), + actual: conds.data_type(), + } + })? + .as_boolean_array(); + + let mut then_input_batch = None; + let mut else_input_batch = None; + let mut null_input_batch = None; + + // instructions for how to reassembly result vector, + // iterate over (type of vec, offset, length) and append to resulting vec + let mut assembly_idx = vec![]; + + // append batch, returning appended batch's slice in (offset, length) + fn append_batch( + batch: &mut Option, + to_be_append: Batch, + ) -> Result<(usize, usize), EvalError> { + let len = to_be_append.row_count(); + if let Some(batch) = batch { + let offset = batch.row_count(); + batch.append_batch(to_be_append)?; + Ok((offset, len)) + } else { + *batch = Some(to_be_append); + Ok((0, len)) + } + } + + let mut prev_cond: Option> = None; + let mut prev_start_idx: Option = None; + // first put different conds' vector into different batches + for (idx, cond) in bool_conds.iter().enumerate() { + // if belong to same slice and not last one continue + if prev_cond == Some(cond) { + continue; + } else if let Some(prev_cond_idx) = prev_start_idx { + let prev_cond = prev_cond.unwrap(); + + // put a slice to corresponding batch + let slice_offset = prev_cond_idx; + let slice_length = idx - prev_cond_idx; + let to_be_append = batch.slice(slice_offset, slice_length); + + let to_put_back = match prev_cond { + Some(true) => ( + Some(true), + append_batch(&mut then_input_batch, to_be_append)?, + ), + Some(false) => ( + Some(false), + append_batch(&mut else_input_batch, to_be_append)?, + ), + None => (None, append_batch(&mut null_input_batch, to_be_append)?), + }; + assembly_idx.push(to_put_back); + } + prev_cond = Some(cond); + prev_start_idx = Some(idx); + } + + // deal with empty and last slice case + if let Some(slice_offset) = prev_start_idx { + let prev_cond = prev_cond.unwrap(); + let slice_length = bool_conds.len() - slice_offset; + let to_be_append = batch.slice(slice_offset, slice_length); + let to_put_back = match prev_cond { + Some(true) => ( + Some(true), + append_batch(&mut then_input_batch, to_be_append)?, + ), + Some(false) => ( + Some(false), + append_batch(&mut else_input_batch, to_be_append)?, + ), + None => (None, append_batch(&mut null_input_batch, to_be_append)?), + }; + assembly_idx.push(to_put_back); + } + + let then_output_vec = then_input_batch + .map(|batch| then.eval_batch(&batch)) + .transpose()?; + let else_output_vec = else_input_batch + .map(|batch| els.eval_batch(&batch)) + .transpose()?; + let null_output_vec = null_input_batch + .map(|null| NullVector::new(null.row_count()).slice(0, null.row_count())); + + let dt = then_output_vec + .as_ref() + .map(|v| v.data_type()) + .or(else_output_vec.as_ref().map(|v| v.data_type())) + .unwrap_or(ConcreteDataType::null_datatype()); + let mut builder = dt.create_mutable_vector(conds.len()); + for (cond, (offset, length)) in assembly_idx { + let slice = match cond { + Some(true) => then_output_vec.as_ref(), + Some(false) => else_output_vec.as_ref(), + None => null_output_vec.as_ref(), + } + .context(InternalSnafu { + reason: "Expect corresponding output vector to exist", + })?; + // TODO(discord9): seems `extend_slice_of` doesn't support NullVector or ConstantVector + // consider adding it maybe? + if slice.data_type().is_null() { + builder.push_nulls(length); + } else if slice.is_const() { + let arr = slice.slice(offset, length).to_arrow_array(); + let vector = Helper::try_into_vector(arr).context(DataTypeSnafu { + msg: "Failed to convert arrow array to vector", + })?; + builder + .extend_slice_of(vector.as_ref(), 0, vector.len()) + .context(DataTypeSnafu { + msg: "Failed to build result vector for if-then expression", + })?; + } else { + builder + .extend_slice_of(slice.as_ref(), offset, length) + .context(DataTypeSnafu { + msg: "Failed to build result vector for if-then expression", + })?; + } + } + let result_vec = builder.to_vector(); + + Ok(result_vec) + } + /// Eval this expression with the given values. pub fn eval(&self, values: &[Value]) -> Result { match self { @@ -747,18 +749,11 @@ impl ScalarExpr { #[cfg(test)] mod test { - use datatypes::arrow::array::Scalar; - use query::parser::QueryLanguageParser; - use query::QueryEngine; - use session::context::QueryContext; - use substrait::extension_serializer; - use substrait::substrait_proto_df::proto::expression::literal::LiteralType; - use substrait::substrait_proto_df::proto::expression::Literal; - use substrait::substrait_proto_df::proto::function_argument::ArgType; - use substrait::substrait_proto_df::proto::r#type::Kind; - use substrait::substrait_proto_df::proto::{r#type, FunctionArgument, Type}; + use datatypes::vectors::Int32Vector; + use pretty_assertions::assert_eq; use super::*; + #[test] fn test_extract_bound() { let test_list: [(ScalarExpr, Result<_, EvalError>); 5] = [ @@ -849,37 +844,68 @@ mod test { assert!(matches!(res, Err(Error::InvalidQuery { .. }))); } - #[tokio::test] - async fn test_df_scalar_function() { - let raw_scalar_func = ScalarFunction { - function_reference: 0, - arguments: vec![FunctionArgument { - arg_type: Some(ArgType::Value(Expression { - rex_type: Some(RexType::Literal(Literal { - nullable: false, - type_variation_reference: 0, - literal_type: Some(LiteralType::I64(-1)), - })), - })), - }], - output_type: None, - ..Default::default() - }; - let input_schema = RelationDesc::try_new( - RelationType::new(vec![ColumnType::new_nullable( - ConcreteDataType::null_datatype(), - )]), - vec!["null_column".to_string()], - ) - .unwrap(); - let extensions = FunctionExtensions::from_iter(vec![(0, "abs")]); - let raw_fn = RawDfScalarFn::from_proto(&raw_scalar_func, input_schema, extensions).unwrap(); - let df_func = DfScalarFunction::try_from_raw_fn(raw_fn).await.unwrap(); - assert_eq!( - df_func - .eval(&[Value::Null], &[ScalarExpr::Column(0)]) - .unwrap(), - Value::Int64(1) - ); + #[test] + fn test_eval_batch() { + // TODO(discord9): add more tests + { + let expr = ScalarExpr::If { + cond: Box::new(ScalarExpr::Column(0).call_binary( + ScalarExpr::literal(Value::from(0), ConcreteDataType::int32_datatype()), + BinaryFunc::Eq, + )), + then: Box::new(ScalarExpr::literal( + Value::from(42), + ConcreteDataType::int32_datatype(), + )), + els: Box::new(ScalarExpr::literal( + Value::from(37), + ConcreteDataType::int32_datatype(), + )), + }; + let raw = vec![ + None, + Some(0), + Some(1), + None, + None, + Some(0), + Some(0), + Some(1), + Some(1), + ]; + let raw_len = raw.len(); + let vectors = vec![Int32Vector::from(raw).slice(0, raw_len)]; + + let batch = Batch::new(vectors, raw_len); + let expected = Int32Vector::from(vec![ + None, + Some(42), + Some(37), + None, + None, + Some(42), + Some(42), + Some(37), + Some(37), + ]) + .slice(0, raw_len); + assert_eq!(expr.eval_batch(&batch).unwrap(), expected); + + let raw = vec![Some(0)]; + let raw_len = raw.len(); + let vectors = vec![Int32Vector::from(raw).slice(0, raw_len)]; + + let batch = Batch::new(vectors, raw_len); + let expected = Int32Vector::from(vec![Some(42)]).slice(0, raw_len); + assert_eq!(expr.eval_batch(&batch).unwrap(), expected); + + let raw: Vec> = vec![]; + let raw_len = raw.len(); + let vectors = vec![Int32Vector::from(raw).slice(0, raw_len)]; + + let batch = Batch::new(vectors, raw_len); + let expected = NullVector::new(raw_len).slice(0, raw_len); + assert_eq!(expr.eval_batch(&batch).unwrap(), expected); + } } } diff --git a/src/flow/src/heartbeat.rs b/src/flow/src/heartbeat.rs index a48230a89883..96635e350dde 100644 --- a/src/flow/src/heartbeat.rs +++ b/src/flow/src/heartbeat.rs @@ -19,24 +19,21 @@ use std::sync::Arc; use api::v1::meta::{HeartbeatRequest, Peer}; use common_error::ext::BoxedError; -use common_grpc::channel_manager::{ChannelConfig, ChannelManager}; -use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler; use common_meta::heartbeat::handler::{ - HandlerGroupExecutor, HeartbeatResponseHandlerContext, HeartbeatResponseHandlerExecutorRef, + HeartbeatResponseHandlerContext, HeartbeatResponseHandlerExecutorRef, }; use common_meta::heartbeat::mailbox::{HeartbeatMailbox, MailboxRef, OutgoingMessage}; use common_meta::heartbeat::utils::outgoing_message_to_mailbox_message; use common_telemetry::{debug, error, info, warn}; use greptime_proto::v1::meta::NodeInfo; -use meta_client::client::{HeartbeatSender, HeartbeatStream, MetaClient, MetaClientBuilder}; -use meta_client::{MetaClientOptions, MetaClientType}; +use meta_client::client::{HeartbeatSender, HeartbeatStream, MetaClient}; use servers::addrs; use servers::heartbeat_options::HeartbeatOptions; use snafu::ResultExt; use tokio::sync::mpsc; -use tokio::time::{Duration, Instant}; +use tokio::time::Duration; -use crate::error::{ExternalSnafu, MetaClientInitSnafu}; +use crate::error::ExternalSnafu; use crate::{Error, FlownodeOptions}; /// The flownode heartbeat task which sending `[HeartbeatRequest]` to Metasrv periodically in background. diff --git a/src/flow/src/lib.rs b/src/flow/src/lib.rs index 99d6773789a2..738ed524ba04 100644 --- a/src/flow/src/lib.rs +++ b/src/flow/src/lib.rs @@ -19,7 +19,6 @@ #![feature(let_chains)] #![feature(duration_abs_diff)] #![allow(dead_code)] -#![allow(unused_imports)] #![warn(clippy::missing_docs_in_private_items)] #![warn(clippy::too_many_lines)] // allow unused for now because it should be use later diff --git a/src/flow/src/plan.rs b/src/flow/src/plan.rs index c31ddb652e3b..dec70324f9fd 100644 --- a/src/flow/src/plan.rs +++ b/src/flow/src/plan.rs @@ -20,17 +20,11 @@ mod reduce; use std::collections::BTreeSet; -use datatypes::arrow::ipc::Map; -use serde::{Deserialize, Serialize}; - use crate::error::Error; -use crate::expr::{ - AggregateExpr, EvalError, GlobalId, Id, LocalId, MapFilterProject, SafeMfpPlan, ScalarExpr, - TypedExpr, -}; +use crate::expr::{GlobalId, Id, LocalId, MapFilterProject, SafeMfpPlan, TypedExpr}; use crate::plan::join::JoinPlan; pub(crate) use crate::plan::reduce::{AccumulablePlan, AggrWithIndex, KeyValPlan, ReducePlan}; -use crate::repr::{ColumnType, DiffRow, RelationDesc, RelationType}; +use crate::repr::{DiffRow, RelationDesc}; /// A plan for a dataflow component. But with type to indicate the output type of the relation. #[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd)] diff --git a/src/flow/src/plan/join.rs b/src/flow/src/plan/join.rs index 4acf0db2342e..1a437dd00d33 100644 --- a/src/flow/src/plan/join.rs +++ b/src/flow/src/plan/join.rs @@ -12,8 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use serde::{Deserialize, Serialize}; - use crate::expr::ScalarExpr; use crate::plan::SafeMfpPlan; diff --git a/src/flow/src/plan/reduce.rs b/src/flow/src/plan/reduce.rs index 3d0d8b356a37..1edd0c40dd55 100644 --- a/src/flow/src/plan/reduce.rs +++ b/src/flow/src/plan/reduce.rs @@ -12,9 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use serde::{Deserialize, Serialize}; - -use crate::expr::{AggregateExpr, Id, LocalId, MapFilterProject, SafeMfpPlan, ScalarExpr}; +use crate::expr::{AggregateExpr, SafeMfpPlan}; /// Describe how to extract key-value pair from a `Row` #[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd)] diff --git a/src/flow/src/repr.rs b/src/flow/src/repr.rs index 188629e58db0..9ce1efa0a04f 100644 --- a/src/flow/src/repr.rs +++ b/src/flow/src/repr.rs @@ -17,14 +17,10 @@ mod relation; -use std::borrow::Borrow; -use std::slice::SliceIndex; - use api::helper::{pb_value_to_value_ref, value_to_grpc_value}; use api::v1::Row as ProtoRow; use datatypes::data_type::ConcreteDataType; use datatypes::types::cast; -use datatypes::types::cast::CastOption; use datatypes::value::Value; use itertools::Itertools; pub(crate) use relation::{ColumnType, Key, RelationDesc, RelationType}; diff --git a/src/flow/src/repr/relation.rs b/src/flow/src/repr/relation.rs index e470ad9dbdbf..65b75ffdcef8 100644 --- a/src/flow/src/repr/relation.rs +++ b/src/flow/src/repr/relation.rs @@ -12,8 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::{BTreeMap, HashMap}; - use datafusion_common::DFSchema; use datatypes::data_type::DataType; use datatypes::prelude::ConcreteDataType; @@ -22,7 +20,7 @@ use serde::{Deserialize, Serialize}; use snafu::{ensure, OptionExt, ResultExt}; use crate::error::{DatafusionSnafu, InternalSnafu, InvalidQuerySnafu, Result, UnexpectedSnafu}; -use crate::expr::{MapFilterProject, SafeMfpPlan, ScalarExpr}; +use crate::expr::{SafeMfpPlan, ScalarExpr}; /// a set of column indices that are "keys" for the collection. #[derive(Default, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize, Hash)] diff --git a/src/flow/src/server.rs b/src/flow/src/server.rs index a32ca197d289..d78f9219cb0c 100644 --- a/src/flow/src/server.rs +++ b/src/flow/src/server.rs @@ -20,35 +20,27 @@ use std::sync::Arc; use api::v1::{RowDeleteRequests, RowInsertRequests}; use cache::{TABLE_FLOWNODE_SET_CACHE_NAME, TABLE_ROUTE_CACHE_NAME}; use catalog::CatalogManagerRef; -use client::client_manager::NodeClients; use common_base::Plugins; use common_error::ext::BoxedError; -use common_grpc::channel_manager::ChannelConfig; -use common_meta::cache::{ - LayeredCacheRegistry, LayeredCacheRegistryRef, TableFlownodeSetCacheRef, TableRouteCacheRef, -}; -use common_meta::ddl::{table_meta, ProcedureExecutorRef}; -use common_meta::heartbeat::handler::HandlerGroupExecutor; +use common_meta::cache::{LayeredCacheRegistryRef, TableFlownodeSetCacheRef, TableRouteCacheRef}; +use common_meta::ddl::ProcedureExecutorRef; use common_meta::key::flow::FlowMetadataManagerRef; use common_meta::key::TableMetadataManagerRef; use common_meta::kv_backend::KvBackendRef; -use common_meta::node_manager::{self, Flownode, NodeManagerRef}; +use common_meta::node_manager::{Flownode, NodeManagerRef}; use common_query::Output; use common_telemetry::tracing::info; -use futures::{FutureExt, StreamExt, TryStreamExt}; +use futures::{FutureExt, TryStreamExt}; use greptime_proto::v1::flow::{flow_server, FlowRequest, FlowResponse, InsertRequests}; use itertools::Itertools; -use meta_client::client::MetaClient; use operator::delete::Deleter; use operator::insert::Inserter; use operator::statement::StatementExecutor; use partition::manager::PartitionRuleManager; use query::{QueryEngine, QueryEngineFactory}; -use serde::de::Unexpected; use servers::error::{AlreadyStartedSnafu, StartGrpcSnafu, TcpBindSnafu, TcpIncomingSnafu}; -use servers::heartbeat_options::HeartbeatOptions; use servers::server::Server; -use session::context::{QueryContext, QueryContextBuilder, QueryContextRef}; +use session::context::{QueryContextBuilder, QueryContextRef}; use snafu::{ensure, OptionExt, ResultExt}; use tokio::net::TcpListener; use tokio::sync::{broadcast, oneshot, Mutex}; diff --git a/src/flow/src/transform.rs b/src/flow/src/transform.rs index f8075b5dc221..5441617b93ab 100644 --- a/src/flow/src/transform.rs +++ b/src/flow/src/transform.rs @@ -16,37 +16,25 @@ use std::collections::{BTreeMap, HashMap}; use std::sync::Arc; -use bytes::buf::IntoIter; use common_error::ext::BoxedError; -use common_telemetry::info; use datafusion::optimizer::simplify_expressions::SimplifyExpressions; use datafusion::optimizer::{OptimizerContext, OptimizerRule}; use datatypes::data_type::ConcreteDataType as CDT; -use literal::{from_substrait_literal, from_substrait_type}; -use prost::Message; use query::parser::QueryLanguageParser; use query::plan::LogicalPlan; use query::query_engine::DefaultSerializer; use query::QueryEngine; use serde::{Deserialize, Serialize}; -use session::context::QueryContext; -use snafu::{OptionExt, ResultExt}; +use snafu::ResultExt; /// note here we are using the `substrait_proto_df` crate from the `substrait` module and /// rename it to `substrait_proto` -use substrait::{ - substrait_proto_df as substrait_proto, DFLogicalSubstraitConvertor, SubstraitPlan, -}; +use substrait::{substrait_proto_df as substrait_proto, DFLogicalSubstraitConvertor}; use substrait_proto::proto::extensions::simple_extension_declaration::MappingType; use substrait_proto::proto::extensions::SimpleExtensionDeclaration; use crate::adapter::FlownodeContext; -use crate::error::{ - DatafusionSnafu, Error, ExternalSnafu, InvalidQueryProstSnafu, NotImplementedSnafu, - TableNotFoundSnafu, UnexpectedSnafu, -}; -use crate::expr::GlobalId; +use crate::error::{DatafusionSnafu, Error, ExternalSnafu, NotImplementedSnafu, UnexpectedSnafu}; use crate::plan::TypedPlan; -use crate::repr::RelationType; /// a simple macro to generate a not implemented error macro_rules! not_impl_err { ($($arg:tt)*) => { @@ -202,7 +190,7 @@ mod test { use catalog::RegisterTableRequest; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, NUMBERS_TABLE_ID}; - use common_time::{Date, DateTime}; + use common_time::DateTime; use datatypes::prelude::*; use datatypes::schema::Schema; use datatypes::vectors::VectorRef; @@ -219,7 +207,8 @@ mod test { use super::*; use crate::adapter::node_context::IdToNameMap; - use crate::repr::ColumnType; + use crate::expr::GlobalId; + use crate::repr::{ColumnType, RelationType}; pub fn create_test_ctx() -> FlownodeContext { let mut schemas = HashMap::new(); diff --git a/src/flow/src/transform/aggr.rs b/src/flow/src/transform/aggr.rs index 64ecc3eec506..c07338047fe0 100644 --- a/src/flow/src/transform/aggr.rs +++ b/src/flow/src/transform/aggr.rs @@ -12,49 +12,23 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::{BTreeMap, HashMap}; +use std::collections::BTreeMap; -use common_decimal::Decimal128; -use common_time::{Date, Timestamp}; -use datatypes::arrow::compute::kernels::window; -use datatypes::arrow::ipc::Binary; -use datatypes::data_type::{ConcreteDataType as CDT, DataType}; +use datatypes::data_type::DataType; use datatypes::value::Value; -use hydroflow::futures::future::Map; use itertools::Itertools; -use snafu::{OptionExt, ResultExt}; -use substrait::variation_const::{ - DATE_32_TYPE_VARIATION_REF, DATE_64_TYPE_VARIATION_REF, DEFAULT_TYPE_VARIATION_REF, - TIMESTAMP_MICRO_TYPE_VARIATION_REF, TIMESTAMP_MILLI_TYPE_VARIATION_REF, - TIMESTAMP_NANO_TYPE_VARIATION_REF, TIMESTAMP_SECOND_TYPE_VARIATION_REF, - UNSIGNED_INTEGER_TYPE_VARIATION_REF, -}; +use snafu::OptionExt; use substrait_proto::proto::aggregate_function::AggregationInvocation; use substrait_proto::proto::aggregate_rel::{Grouping, Measure}; -use substrait_proto::proto::expression::field_reference::ReferenceType::DirectReference; -use substrait_proto::proto::expression::literal::LiteralType; -use substrait_proto::proto::expression::reference_segment::ReferenceType::StructField; -use substrait_proto::proto::expression::{ - IfThen, Literal, MaskExpression, RexType, ScalarFunction, -}; -use substrait_proto::proto::extensions::simple_extension_declaration::MappingType; -use substrait_proto::proto::extensions::SimpleExtensionDeclaration; use substrait_proto::proto::function_argument::ArgType; -use substrait_proto::proto::r#type::Kind; -use substrait_proto::proto::read_rel::ReadType; -use substrait_proto::proto::rel::RelType; -use substrait_proto::proto::{self, plan_rel, Expression, Plan as SubPlan, Rel}; +use substrait_proto::proto::{self}; -use crate::error::{ - DatatypesSnafu, Error, EvalSnafu, InvalidQuerySnafu, NotImplementedSnafu, PlanSnafu, - TableNotFoundSnafu, -}; +use crate::error::{Error, NotImplementedSnafu, PlanSnafu}; use crate::expr::{ - AggregateExpr, AggregateFunc, BinaryFunc, GlobalId, MapFilterProject, SafeMfpPlan, ScalarExpr, - TypedExpr, UnaryFunc, UnmaterializableFunc, VariadicFunc, + AggregateExpr, AggregateFunc, BinaryFunc, MapFilterProject, ScalarExpr, TypedExpr, UnaryFunc, }; use crate::plan::{AccumulablePlan, AggrWithIndex, KeyValPlan, Plan, ReducePlan, TypedPlan}; -use crate::repr::{self, ColumnType, RelationDesc, RelationType}; +use crate::repr::{ColumnType, RelationDesc, RelationType}; use crate::transform::{substrait_proto, FlownodeContext, FunctionExtensions}; impl TypedExpr { @@ -472,13 +446,14 @@ mod test { use bytes::BytesMut; use common_time::{DateTime, Interval}; use datatypes::prelude::ConcreteDataType; - use pretty_assertions::{assert_eq, assert_ne}; + use pretty_assertions::assert_eq; use super::*; - use crate::expr::{DfScalarFunction, RawDfScalarFn}; + use crate::expr::{DfScalarFunction, GlobalId, RawDfScalarFn}; use crate::plan::{Plan, TypedPlan}; - use crate::repr::{self, ColumnType, RelationType}; + use crate::repr::{ColumnType, RelationType}; use crate::transform::test::{create_test_ctx, create_test_query_engine, sql_to_substrait}; + use crate::transform::CDT; /// TODO(discord9): add more illegal sql tests #[tokio::test] async fn test_missing_key_check() { diff --git a/src/flow/src/transform/literal.rs b/src/flow/src/transform/literal.rs index 255ceadb54ca..01e06e96830e 100644 --- a/src/flow/src/transform/literal.rs +++ b/src/flow/src/transform/literal.rs @@ -34,8 +34,7 @@ use substrait::variation_const::{ }; use substrait_proto::proto::expression::literal::LiteralType; use substrait_proto::proto::expression::Literal; -use substrait_proto::proto::r#type::{self, parameter, Kind, Parameter}; -use substrait_proto::proto::Type; +use substrait_proto::proto::r#type::Kind; use crate::error::{Error, NotImplementedSnafu, PlanSnafu, UnexpectedSnafu}; use crate::transform::substrait_proto; diff --git a/src/flow/src/transform/plan.rs b/src/flow/src/transform/plan.rs index 200226fb352a..6841140989d9 100644 --- a/src/flow/src/transform/plan.rs +++ b/src/flow/src/transform/plan.rs @@ -22,11 +22,9 @@ use substrait_proto::proto::read_rel::ReadType; use substrait_proto::proto::rel::RelType; use substrait_proto::proto::{plan_rel, Plan as SubPlan, ProjectRel, Rel}; -use crate::error::{ - Error, InternalSnafu, InvalidQuerySnafu, NotImplementedSnafu, PlanSnafu, UnexpectedSnafu, -}; +use crate::error::{Error, InvalidQuerySnafu, NotImplementedSnafu, PlanSnafu, UnexpectedSnafu}; use crate::expr::{MapFilterProject, ScalarExpr, TypedExpr, UnaryFunc}; -use crate::plan::{KeyValPlan, Plan, ReducePlan, TypedPlan}; +use crate::plan::{KeyValPlan, Plan, TypedPlan}; use crate::repr::{self, RelationDesc, RelationType}; use crate::transform::{substrait_proto, FlownodeContext, FunctionExtensions}; @@ -350,7 +348,7 @@ mod test { use super::*; use crate::expr::{GlobalId, ScalarExpr}; use crate::plan::{Plan, TypedPlan}; - use crate::repr::{self, ColumnType, RelationType}; + use crate::repr::{ColumnType, RelationType}; use crate::transform::test::{create_test_ctx, create_test_query_engine, sql_to_substrait}; use crate::transform::CDT; diff --git a/src/flow/src/utils.rs b/src/flow/src/utils.rs index 69ff8fa2d248..778fde49c9a3 100644 --- a/src/flow/src/utils.rs +++ b/src/flow/src/utils.rs @@ -19,14 +19,11 @@ use std::ops::Bound; use std::sync::Arc; use common_telemetry::debug; -use itertools::Itertools; -use serde::{Deserialize, Serialize}; use smallvec::{smallvec, SmallVec}; -use tokio::sync::{Mutex, RwLock}; +use tokio::sync::RwLock; -use crate::expr::error::InternalSnafu; use crate::expr::{EvalError, ScalarExpr}; -use crate::repr::{value_to_internal_ts, Diff, DiffRow, Duration, KeyValDiffRow, Row, Timestamp}; +use crate::repr::{value_to_internal_ts, DiffRow, Duration, KeyValDiffRow, Row, Timestamp}; /// A batch of updates, arranged by key pub type Batch = BTreeMap>; @@ -585,6 +582,7 @@ mod test { use std::borrow::Borrow; use datatypes::value::Value; + use itertools::Itertools; use super::*;