Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Comet windows functions support #200

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
163 changes: 157 additions & 6 deletions core/src/execution/datafusion/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,10 @@ use datafusion::{
arrow::{compute::SortOptions, datatypes::SchemaRef},
common::DataFusionError,
execution::FunctionRegistry,
logical_expr::Operator as DataFusionOperator,
logical_expr::{
expr::find_df_window_func, Operator as DataFusionOperator, WindowFrame, WindowFrameBound,
WindowFrameUnits,
},
physical_expr::{
execution_props::ExecutionProps,
expressions::{
Expand All @@ -42,7 +45,8 @@ use datafusion::{
limit::LocalLimitExec,
projection::ProjectionExec,
sorts::sort::SortExec,
ExecutionPlan, Partitioning,
windows::BoundedWindowAggExec,
ExecutionPlan, InputOrderMode, Partitioning, WindowExpr,
},
prelude::SessionContext,
};
Expand Down Expand Up @@ -87,12 +91,15 @@ use crate::{
},
operators::{CopyExec, ExecutionError, ScanExec},
serde::to_arrow_datatype,
spark_expression,
spark_expression::{
agg_expr::ExprStruct as AggExprStruct, expr::ExprStruct, literal::Value, AggExpr, Expr,
ScalarFunc,
self, agg_expr::ExprStruct as AggExprStruct, expr::ExprStruct, literal::Value, AggExpr,
Expr, ScalarFunc,
},
spark_operator::{
lower_window_frame_bound::LowerFrameBoundStruct, operator::OpStruct,
upper_window_frame_bound::UpperFrameBoundStruct, BuildSide, JoinType, Operator,
WindowFrameType,
},
spark_operator::{operator::OpStruct, BuildSide, JoinType, Operator},
spark_partitioning::{partitioning::PartitioningStruct, Partitioning as SparkPartitioning},
},
};
Expand Down Expand Up @@ -870,6 +877,50 @@ impl PhysicalPlanner {
)?),
))
}
OpStruct::Window(wnd) => {
dbg!(&children[0]);
let (scans, child) = self.create_plan(&children[0], inputs)?;
//dbg!(&child);
let input_schema = child.schema();
dbg!(&input_schema);
let sort_exprs: Result<Vec<PhysicalSortExpr>, ExecutionError> = wnd
.order_by_list
.iter()
.map(|expr| self.create_sort_expr(expr, input_schema.clone()))
.collect();

let partition_exprs: Result<Vec<Arc<dyn PhysicalExpr>>, ExecutionError> = wnd
.partition_by_list
.iter()
.map(|expr| self.create_expr(expr, input_schema.clone()))
.collect();

let sort_exprs = &sort_exprs?;
let partition_exprs = &partition_exprs?;

let window_expr: Result<Vec<Arc<dyn WindowExpr>>, ExecutionError> = wnd
.window_expr
.iter()
.map(|expr| {
self.create_window_expr(
expr,
input_schema.clone(),
partition_exprs,
sort_exprs,
)
})
.collect();

Ok((
scans,
Arc::new(BoundedWindowAggExec::try_new(
window_expr?,
child,
partition_exprs.to_vec(),
InputOrderMode::Sorted,
)?),
))
}
OpStruct::Expand(expand) => {
assert!(children.len() == 1);
let (scans, child) = self.create_plan(&children[0], inputs)?;
Expand Down Expand Up @@ -1330,6 +1381,106 @@ impl PhysicalPlanner {
}
}

/// Create a DataFusion windows physical expression from Spark physical expression
fn create_window_expr<'a>(
&'a self,
spark_expr: &'a crate::execution::spark_operator::WindowExpr,
input_schema: SchemaRef,
partition_by: &[Arc<dyn PhysicalExpr>],
sort_exprs: &[PhysicalSortExpr],
) -> Result<Arc<dyn WindowExpr>, ExecutionError> {
let (window_func_name, window_func_args) =
match &spark_expr.func.as_ref().unwrap().expr_struct.as_ref() {
Some(ExprStruct::ScalarFunc(f)) => (f.func.clone(), f.args.clone()),
other => {
return Err(ExecutionError::GeneralError(format!(
"{other:?} not supported for window function"
)))
}
};

let window_func = match find_df_window_func(&window_func_name) {
Some(f) => f,
_ => {
return Err(ExecutionError::GeneralError(format!(
"{window_func_name} not supported for window function"
)))
}
};

let window_args = window_func_args
.iter()
.map(|expr| self.create_expr(expr, input_schema.clone()))
.collect::<Result<Vec<_>, ExecutionError>>()?;

let spark_window_frame = match spark_expr
.spec
.as_ref()
.and_then(|inner| inner.frame_specification.as_ref())
{
Some(frame) => frame,
_ => {
return Err(ExecutionError::DeserializeError(
"Cannot deserialize window frame".to_string(),
))
}
};

let units = match spark_window_frame.frame_type() {
WindowFrameType::Rows => WindowFrameUnits::Rows,
WindowFrameType::Range => WindowFrameUnits::Range,
};

let lower_bound: WindowFrameBound = match spark_window_frame
.lower_bound
.as_ref()
.and_then(|inner| inner.lower_frame_bound_struct.as_ref())
{
Some(l) => match l {
LowerFrameBoundStruct::UnboundedPreceding(_) => {
WindowFrameBound::Preceding(ScalarValue::Null)
}
LowerFrameBoundStruct::Preceding(offset) => {
WindowFrameBound::Preceding(ScalarValue::Int32(Some(offset.offset)))
}
LowerFrameBoundStruct::CurrentRow(_) => WindowFrameBound::CurrentRow,
},
None => WindowFrameBound::Preceding(ScalarValue::Null),
};

let upper_bound: WindowFrameBound = match spark_window_frame
.upper_bound
.as_ref()
.and_then(|inner| inner.upper_frame_bound_struct.as_ref())
{
Some(u) => match u {
UpperFrameBoundStruct::UnboundedFollowing(_) => {
WindowFrameBound::Preceding(ScalarValue::Null)
}
UpperFrameBoundStruct::Following(offset) => {
WindowFrameBound::Preceding(ScalarValue::Int32(Some(offset.offset)))
}
UpperFrameBoundStruct::CurrentRow(_) => WindowFrameBound::CurrentRow,
},
None => WindowFrameBound::Following(ScalarValue::Null),
};

let window_frame = WindowFrame::new_bounds(units, lower_bound, upper_bound);

dbg!(&window_func);
datafusion::physical_plan::windows::create_window_expr(
&window_func,
window_func_name,
&window_args,
partition_by,
sort_exprs,
window_frame.into(),
&input_schema,
false, // TODO: Ignore nulls
)
.map_err(|e| ExecutionError::DataFusionError(e.to_string()))
}

/// Create a DataFusion physical partitioning from Spark physical partitioning
fn create_partitioning(
&self,
Expand Down
1 change: 1 addition & 0 deletions core/src/execution/jni_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_executePlan(
try_unwrap_or_throw(&e, |mut env| {
// Retrieve the query
let exec_context = get_execution_context(exec_context);
//dbg!(&exec_context.spark_plan.children[0]);

let exec_context_id = exec_context.id;

Expand Down
58 changes: 58 additions & 0 deletions core/src/execution/proto/operator.proto
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ message Operator {
Expand expand = 107;
SortMergeJoin sort_merge_join = 108;
HashJoin hash_join = 109;
Window window = 110;
}
}

Expand Down Expand Up @@ -120,3 +121,60 @@ enum BuildSide {
BuildLeft = 0;
BuildRight = 1;
}

message WindowExpr {
spark.spark_expression.Expr func = 1;
WindowSpecDefinition spec = 2;
}

enum WindowFrameType {
Rows = 0;
Range = 1;
}

message WindowFrame {
WindowFrameType frame_type = 1;
LowerWindowFrameBound lower_bound = 2;
UpperWindowFrameBound upper_bound = 3;
}

message LowerWindowFrameBound {
oneof lower_frame_bound_struct {
UnboundedPreceding unboundedPreceding = 1;
Preceding preceding = 2;
CurrentRow currentRow = 3;
}
}

message UpperWindowFrameBound {
oneof upper_frame_bound_struct {
UnboundedFollowing unboundedFollowing = 1;
Following following = 2;
CurrentRow currentRow = 3;
}
}

message Preceding {
int32 offset = 1;
}

message Following {
int32 offset = 1;
}

message UnboundedPreceding {}
message UnboundedFollowing {}
message CurrentRow {}

message WindowSpecDefinition {
repeated spark.spark_expression.Expr partitionSpec = 1;
repeated spark.spark_expression.Expr orderSpec = 2;
WindowFrame frameSpecification = 3;
}

message Window {
repeated WindowExpr window_expr = 1;
repeated spark.spark_expression.Expr order_by_list = 2;
repeated spark.spark_expression.Expr partition_by_list = 3;
Operator child = 4;
}
56 changes: 56 additions & 0 deletions docs/source/contributor-guide/debugging.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,62 @@ This HOWTO describes how to debug JVM code and Native code concurrently. The gui

_Caveat: The steps here have only been tested with JDK 11_ on Mac (M1)

# Expand Comet exception details
By default, Comet outputs the exception details specific for Comet. There is a possibility of extending the exception
details by leveraging Datafusion [backtraces](https://arrow.apache.org/datafusion/user-guide/example-usage.html#enable-backtraces)

```scala
scala> spark.sql("my_failing_query").show(false)

24/03/05 17:00:07 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)/ 1]
org.apache.comet.CometNativeException: Internal error: MIN/MAX is not expected to receive scalars of incompatible types (Date32("NULL"), Int32(15901)).
This was likely caused by a bug in DataFusion's code and we would welcome that you file an bug report in our issue tracker
at org.apache.comet.Native.executePlan(Native Method)
at org.apache.comet.CometExecIterator.executeNative(CometExecIterator.scala:65)
at org.apache.comet.CometExecIterator.getNextBatch(CometExecIterator.scala:111)
at org.apache.comet.CometExecIterator.hasNext(CometExecIterator.scala:126)

```
To do that with Comet it is needed to enable `backtrace` in https://github.com/apache/arrow-datafusion-comet/blob/main/core/Cargo.toml

```
datafusion-common = { version = "36.0.0", features = ["backtrace"] }
datafusion = { default-features = false, version = "36.0.0", features = ["unicode_expressions", "backtrace"] }
```

Then build the Comet as [described](https://github.com/apache/arrow-datafusion-comet/blob/main/README.md#getting-started)

Start Comet with `RUST_BACKTRACE=1`

```commandline
RUST_BACKTRACE=1 $SPARK_HOME/spark-shell --jars spark/target/comet-spark-spark3.4_2.12-0.1.0-SNAPSHOT.jar --conf spark.sql.extensions=org.apache.comet.CometSparkSessionExtensions --conf spark.comet.enabled=true --conf spark.comet.exec.enabled=true --conf spark.comet.exec.all.enabled=true
```

Get the expanded exception details
```scala
scala> spark.sql("my_failing_query").show(false)
24/03/05 17:00:49 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
org.apache.comet.CometNativeException: Internal error: MIN/MAX is not expected to receive scalars of incompatible types (Date32("NULL"), Int32(15901))

backtrace: 0: std::backtrace::Backtrace::create
1: datafusion_physical_expr::aggregate::min_max::min
2: <datafusion_physical_expr::aggregate::min_max::MinAccumulator as datafusion_expr::accumulator::Accumulator>::update_batch
3: <futures_util::stream::stream::fuse::Fuse<S> as futures_core::stream::Stream>::poll_next
4: comet::execution::jni_api::Java_org_apache_comet_Native_executePlan::{{closure}}
5: _Java_org_apache_comet_Native_executePlan
.
This was likely caused by a bug in DataFusion's code and we would welcome that you file an bug report in our issue tracker
at org.apache.comet.Native.executePlan(Native Method)
at org.apache.comet.CometExecIterator.executeNative(CometExecIterator.scala:65)
at org.apache.comet.CometExecIterator.getNextBatch(CometExecIterator.scala:111)
at org.apache.comet.CometExecIterator.hasNext(CometExecIterator.scala:126)

...
```
Note:
- The backtrace coverage in Datafusion is still improving. So there is a chance the error still not covered, feel free to file a [ticket](https://github.com/apache/arrow-datafusion/issues)
- The backtrace doesn't come for free and therefore intended for debugging purposes

## Debugging for Advanced Developers

Add a `.lldbinit` to comet/core. This is not strictly necessary but will be useful if you want to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ReusedExchangeExec, ShuffleExchangeExec}
import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, ShuffledHashJoinExec, SortMergeJoinExec}
import org.apache.spark.sql.execution.window.WindowExec
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._

Expand Down Expand Up @@ -532,6 +533,16 @@ class CometSparkSessionExtensions
s
}

case w: WindowExec =>
QueryPlanSerde.operator2Proto(w) match {
case Some(nativeOp) =>
val cometOp =
CometWindowExec(w, w.windowExpression, w.partitionSpec, w.orderSpec, w.child)
CometSinkPlaceHolder(nativeOp, w, cometOp)
case None =>
w
}

case s: TakeOrderedAndProjectExec =>
val info1 = createMessage(
!isCometOperatorEnabled(conf, "takeOrderedAndProjectExec"),
Expand Down
Loading
Loading