Skip to content

Commit

Permalink
feat(flow): add eval_batch for ScalarExpr (#4551)
Browse files Browse the repository at this point in the history
* refactor: better perf flow

* feat(WIP): batching proc

* feat: UnaryFunc::eval_batch untested

* feat: BinaryFunc::eval_batch untested

* feat: VariadicFunc::eval_batch un tested

* feat: literal eval_batch

* refactor: move DfScalarFunc to separate file

* chore: remove unused imports

* feat: eval_batch df func&ifthen

* chore: remove unused file

* refactor: use Batch type

* chore: remove unused

* chore: remove a done TODO

* refactor: per review

* chore: import

* refactor: eval_batch if then

* chore: typo
  • Loading branch information
discord9 committed Aug 14, 2024
1 parent c1b1be4 commit 2c3fccb
Show file tree
Hide file tree
Showing 30 changed files with 981 additions and 390 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/flow/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ workspace = true

[dependencies]
api.workspace = true
arrow.workspace = true
arrow-schema.workspace = true
async-recursion = "1.0"
async-trait.workspace = true
Expand Down
34 changes: 10 additions & 24 deletions src/flow/src/compute/render.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,32 +16,21 @@
//!
//! And the [`Context`] is the environment for the render process, it contains all the necessary information for the render process

use std::cell::RefCell;
use std::collections::{BTreeMap, VecDeque};
use std::ops::Range;
use std::rc::Rc;

use datatypes::data_type::ConcreteDataType;
use datatypes::value::{ListValue, Value};
use hydroflow::futures::SinkExt;
use hydroflow::lattices::cc_traits::Get;
use std::collections::BTreeMap;

use hydroflow::scheduled::graph::Hydroflow;
use hydroflow::scheduled::graph_ext::GraphExt;
use hydroflow::scheduled::port::{PortCtx, SEND};
use itertools::Itertools;
use snafu::{ensure, OptionExt, ResultExt};
use snafu::OptionExt;

use super::state::Scheduler;
use crate::compute::state::DataflowState;
use crate::compute::types::{Arranged, Collection, CollectionBundle, ErrCollector, Toff};
use crate::error::{Error, EvalSnafu, InvalidQuerySnafu, NotImplementedSnafu, PlanSnafu};
use crate::expr::error::{DataTypeSnafu, InternalSnafu};
use crate::expr::{
self, EvalError, GlobalId, LocalId, MapFilterProject, MfpPlan, SafeMfpPlan, ScalarExpr,
};
use crate::plan::{AccumulablePlan, KeyValPlan, Plan, ReducePlan, TypedPlan};
use crate::repr::{self, DiffRow, KeyValDiffRow, Row};
use crate::utils::{ArrangeHandler, ArrangeReader, ArrangeWriter, Arrangement};
use crate::compute::types::{Collection, CollectionBundle, ErrCollector, Toff};
use crate::error::{Error, InvalidQuerySnafu, NotImplementedSnafu};
use crate::expr::{self, GlobalId, LocalId};
use crate::plan::{Plan, TypedPlan};
use crate::repr::{self, DiffRow};

mod map;
mod reduce;
Expand Down Expand Up @@ -218,20 +207,17 @@ mod test {
use std::cell::RefCell;
use std::rc::Rc;

use common_time::DateTime;
use datatypes::data_type::ConcreteDataType;
use hydroflow::scheduled::graph::Hydroflow;
use hydroflow::scheduled::graph_ext::GraphExt;
use hydroflow::scheduled::handoff::VecHandoff;
use pretty_assertions::{assert_eq, assert_ne};
use pretty_assertions::assert_eq;

use super::*;
use crate::expr::BinaryFunc;
use crate::repr::Row;
pub fn run_and_check(
state: &mut DataflowState,
df: &mut Hydroflow,
time_range: Range<i64>,
time_range: std::ops::Range<i64>,
expected: BTreeMap<i64, Vec<DiffRow>>,
output: Rc<RefCell<Vec<DiffRow>>>,
) {
Expand Down
5 changes: 2 additions & 3 deletions src/flow/src/compute/render/map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use crate::compute::state::Scheduler;
use crate::compute::types::{Arranged, Collection, CollectionBundle, ErrCollector, Toff};
use crate::error::{Error, PlanSnafu};
use crate::expr::{EvalError, MapFilterProject, MfpPlan, ScalarExpr};
use crate::plan::{Plan, TypedPlan};
use crate::plan::TypedPlan;
use crate::repr::{self, DiffRow, KeyValDiffRow, Row};
use crate::utils::ArrangeHandler;

Expand Down Expand Up @@ -206,8 +206,6 @@ fn eval_mfp_core(

#[cfg(test)]
mod test {
use std::cell::RefCell;
use std::rc::Rc;

use datatypes::data_type::ConcreteDataType;
use hydroflow::scheduled::graph::Hydroflow;
Expand All @@ -216,6 +214,7 @@ mod test {
use crate::compute::render::test::{get_output_handle, harness_test_ctx, run_and_check};
use crate::compute::state::DataflowState;
use crate::expr::{self, BinaryFunc, GlobalId};
use crate::plan::Plan;
use crate::repr::{ColumnType, RelationType};

/// test if temporal filter works properly
Expand Down
13 changes: 6 additions & 7 deletions src/flow/src/compute/render/reduce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,15 @@ use std::ops::Range;
use datatypes::data_type::ConcreteDataType;
use datatypes::value::{ListValue, Value};
use hydroflow::scheduled::graph_ext::GraphExt;
use hydroflow::scheduled::port::{PortCtx, SEND};
use itertools::Itertools;
use snafu::{ensure, OptionExt, ResultExt};

use crate::compute::render::{Context, SubgraphArg};
use crate::compute::state::Scheduler;
use crate::compute::types::{Arranged, Collection, CollectionBundle, ErrCollector, Toff};
use crate::error::{Error, PlanSnafu};
use crate::expr::error::{DataAlreadyExpiredSnafu, DataTypeSnafu, InternalSnafu};
use crate::expr::{AggregateExpr, EvalError, ScalarExpr};
use crate::plan::{AccumulablePlan, AggrWithIndex, KeyValPlan, Plan, ReducePlan, TypedPlan};
use crate::expr::{EvalError, ScalarExpr};
use crate::plan::{AccumulablePlan, AggrWithIndex, KeyValPlan, ReducePlan, TypedPlan};
use crate::repr::{self, DiffRow, KeyValDiffRow, RelationType, Row};
use crate::utils::{ArrangeHandler, ArrangeReader, ArrangeWriter, KeyExpiryManager};

Expand Down Expand Up @@ -790,8 +788,6 @@ fn from_val_to_slice_idx(
// TODO(discord9): add tests for accum ser/de
#[cfg(test)]
mod test {
use std::cell::RefCell;
use std::rc::Rc;

use common_time::{DateTime, Interval, Timestamp};
use datatypes::data_type::{ConcreteDataType, ConcreteDataType as CDT};
Expand All @@ -800,7 +796,10 @@ mod test {
use super::*;
use crate::compute::render::test::{get_output_handle, harness_test_ctx, run_and_check};
use crate::compute::state::DataflowState;
use crate::expr::{self, AggregateFunc, BinaryFunc, GlobalId, MapFilterProject, UnaryFunc};
use crate::expr::{
self, AggregateExpr, AggregateFunc, BinaryFunc, GlobalId, MapFilterProject, UnaryFunc,
};
use crate::plan::Plan;
use crate::repr::{ColumnType, RelationType};

/// SELECT sum(number) FROM numbers_with_ts GROUP BY tumble(ts, '1 second', '2021-07-01 00:00:00')
Expand Down
4 changes: 2 additions & 2 deletions src/flow/src/compute/render/src_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

use std::collections::{BTreeMap, VecDeque};

use common_telemetry::{debug, info};
use common_telemetry::debug;
use hydroflow::scheduled::graph_ext::GraphExt;
use itertools::Itertools;
use snafu::OptionExt;
Expand All @@ -27,7 +27,7 @@ use crate::compute::render::Context;
use crate::compute::types::{Arranged, Collection, CollectionBundle, Toff};
use crate::error::{Error, PlanSnafu};
use crate::expr::error::InternalSnafu;
use crate::expr::{EvalError, GlobalId};
use crate::expr::EvalError;
use crate::repr::{DiffRow, Row, BROADCAST_CAP};

#[allow(clippy::mutable_key_type)]
Expand Down
2 changes: 1 addition & 1 deletion src/flow/src/compute/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
// limitations under the License.

use std::cell::RefCell;
use std::collections::{BTreeMap, BTreeSet, VecDeque};
use std::collections::{BTreeMap, VecDeque};
use std::rc::Rc;

use hydroflow::scheduled::graph::Hydroflow;
Expand Down
5 changes: 2 additions & 3 deletions src/flow/src/compute/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,11 @@ use hydroflow::scheduled::handoff::TeeingHandoff;
use hydroflow::scheduled::port::RecvPort;
use hydroflow::scheduled::SubgraphId;
use itertools::Itertools;
use tokio::sync::{Mutex, RwLock};
use tokio::sync::Mutex;

use crate::compute::render::Context;
use crate::expr::{EvalError, ScalarExpr};
use crate::repr::DiffRow;
use crate::utils::{ArrangeHandler, Arrangement};
use crate::utils::ArrangeHandler;

pub type Toff<T = DiffRow> = TeeingHandoff<T>;

Expand Down
88 changes: 86 additions & 2 deletions src/flow/src/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

//! for declare Expression in dataflow, including map, reduce, id and join(TODO!) etc.

mod df_func;
pub(crate) mod error;
mod func;
mod id;
Expand All @@ -22,9 +23,92 @@ mod relation;
mod scalar;
mod signature;

pub(crate) use error::{EvalError, InvalidArgumentSnafu, OptimizeSnafu};
use datatypes::prelude::DataType;
use datatypes::vectors::VectorRef;
pub(crate) use df_func::{DfScalarFunction, RawDfScalarFn};
pub(crate) use error::{EvalError, InvalidArgumentSnafu};
pub(crate) use func::{BinaryFunc, UnaryFunc, UnmaterializableFunc, VariadicFunc};
pub(crate) use id::{GlobalId, Id, LocalId};
use itertools::Itertools;
pub(crate) use linear::{MapFilterProject, MfpPlan, SafeMfpPlan};
pub(crate) use relation::{AggregateExpr, AggregateFunc};
pub(crate) use scalar::{DfScalarFunction, RawDfScalarFn, ScalarExpr, TypedExpr};
pub(crate) use scalar::{ScalarExpr, TypedExpr};
use snafu::{ensure, ResultExt};

use crate::expr::error::DataTypeSnafu;

/// A batch of vectors with the same length but without schema, only useful in dataflow
pub struct Batch {
batch: Vec<VectorRef>,
row_count: usize,
}

impl Batch {
pub fn new(batch: Vec<VectorRef>, row_count: usize) -> Self {
Self { batch, row_count }
}

pub fn batch(&self) -> &[VectorRef] {
&self.batch
}

pub fn row_count(&self) -> usize {
self.row_count
}

/// Slices the `Batch`, returning a new `Batch`.
///
/// # Panics
/// This function panics if `offset + length > self.row_count()`.
pub fn slice(&self, offset: usize, length: usize) -> Batch {
let batch = self
.batch()
.iter()
.map(|v| v.slice(offset, length))
.collect_vec();
Batch::new(batch, length)
}

/// append another batch to self
pub fn append_batch(&mut self, other: Batch) -> Result<(), EvalError> {
ensure!(
self.batch.len() == other.batch.len(),
InvalidArgumentSnafu {
reason: format!(
"Expect two batch to have same numbers of column, found {} and {} columns",
self.batch.len(),
other.batch.len()
)
}
);

let batch_builders = self
.batch
.iter()
.map(|v| {
v.data_type()
.create_mutable_vector(self.row_count() + other.row_count())
})
.collect_vec();

let mut result = vec![];
let zelf_row_count = self.row_count();
let other_row_count = other.row_count();
for (idx, mut builder) in batch_builders.into_iter().enumerate() {
builder
.extend_slice_of(self.batch()[idx].as_ref(), 0, zelf_row_count)
.context(DataTypeSnafu {
msg: "Failed to extend vector",
})?;
builder
.extend_slice_of(other.batch()[idx].as_ref(), 0, other_row_count)
.context(DataTypeSnafu {
msg: "Failed to extend vector",
})?;
result.push(builder.to_vector());
}
self.batch = result;
self.row_count = zelf_row_count + other_row_count;
Ok(())
}
}
Loading

0 comments on commit 2c3fccb

Please sign in to comment.