Skip to content

Commit 0ebc59f

Browse files
committed
rebase
1 parent ebdde77 commit 0ebc59f

File tree

4 files changed

+36
-3
lines changed

4 files changed

+36
-3
lines changed

native/core/src/execution/datafusion/planner.rs

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1056,8 +1056,13 @@ impl PhysicalPlanner {
10561056
};
10571057

10581058
// The `ScanExec` operator will take actual arrays from Spark during execution
1059-
let scan =
1060-
ScanExec::new(self.exec_context_id, input_source, &scan.source, data_types)?;
1059+
let scan = ScanExec::new(
1060+
self.exec_context_id,
1061+
input_source,
1062+
&scan.source,
1063+
data_types,
1064+
scan.reuses_buffers,
1065+
)?;
10611066
Ok((
10621067
vec![scan.clone()],
10631068
Arc::new(SparkPlan::new(spark_plan.plan_id, Arc::new(scan), vec![])),
@@ -2138,8 +2143,13 @@ impl From<ExpressionError> for DataFusionError {
21382143
fn can_reuse_input_batch(op: &Arc<dyn ExecutionPlan>) -> bool {
21392144
if op.as_any().is::<ProjectionExec>() || op.as_any().is::<LocalLimitExec>() {
21402145
can_reuse_input_batch(op.children()[0])
2146+
} else if op.as_any().is::<ScanExec>() {
2147+
op.as_any()
2148+
.downcast_ref::<ScanExec>()
2149+
.unwrap()
2150+
.reuses_buffers
21412151
} else {
2142-
op.as_any().is::<ScanExec>()
2152+
false
21432153
}
21442154
}
21452155

@@ -2312,6 +2322,7 @@ mod tests {
23122322
type_info: None,
23132323
}],
23142324
source: "".to_string(),
2325+
reuses_buffers: false,
23152326
})),
23162327
};
23172328

@@ -2385,6 +2396,7 @@ mod tests {
23852396
type_info: None,
23862397
}],
23872398
source: "".to_string(),
2399+
reuses_buffers: false,
23882400
})),
23892401
};
23902402

@@ -2603,6 +2615,7 @@ mod tests {
26032615
op_struct: Some(OpStruct::Scan(spark_operator::Scan {
26042616
fields: vec![create_proto_datatype()],
26052617
source: "".to_string(),
2618+
reuses_buffers: false,
26062619
})),
26072620
}
26082621
}

native/core/src/execution/operators/scan.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,8 @@ pub struct ScanExec {
6464
pub input_source: Option<Arc<GlobalRef>>,
6565
/// A description of the input source for informational purposes
6666
pub input_source_description: String,
67+
/// Some sources (currently only the native Parquet scan) re-use mutable buffers
68+
pub reuses_buffers: bool,
6769
/// The data types of columns of the input batch. Converted from Spark schema.
6870
pub data_types: Vec<DataType>,
6971
/// Schema of first batch
@@ -85,6 +87,7 @@ impl ScanExec {
8587
input_source: Option<Arc<GlobalRef>>,
8688
input_source_description: &str,
8789
data_types: Vec<DataType>,
90+
reuses_buffers: bool,
8891
) -> Result<Self, CometError> {
8992
let metrics_set = ExecutionPlanMetricsSet::default();
9093
let baseline_metrics = BaselineMetrics::new(&metrics_set, 0);
@@ -119,6 +122,7 @@ impl ScanExec {
119122
exec_context_id,
120123
input_source,
121124
input_source_description: input_source_description.to_string(),
125+
reuses_buffers,
122126
data_types,
123127
batch: Arc::new(Mutex::new(Some(first_batch))),
124128
cache,

native/proto/src/proto/operator.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ message Scan {
5555
// is purely for informational purposes when viewing native query plans in
5656
// debug mode.
5757
string source = 2;
58+
bool reuses_buffers = 3;
5859
}
5960

6061
message Projection {

spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,9 @@ import org.apache.spark.sql.execution
3535
import org.apache.spark.sql.execution._
3636
import org.apache.spark.sql.execution.adaptive.{BroadcastQueryStageExec, ShuffleQueryStageExec}
3737
import org.apache.spark.sql.execution.aggregate.{BaseAggregateExec, HashAggregateExec, ObjectHashAggregateExec}
38+
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
39+
import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
40+
import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan
3841
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ReusedExchangeExec, ShuffleExchangeExec}
3942
import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, HashJoin, ShuffledHashJoinExec, SortMergeJoinExec}
4043
import org.apache.spark.sql.execution.window.WindowExec
@@ -2910,6 +2913,7 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim
29102913
// These operators are source of Comet native execution chain
29112914
val scanBuilder = OperatorOuterClass.Scan.newBuilder()
29122915
scanBuilder.setSource(op.simpleStringWithNodeId())
2916+
scanBuilder.setReusesBuffers(isParquetScan(op))
29132917

29142918
val scanTypes = op.output.flatten { attr =>
29152919
serializeDataType(attr.dataType)
@@ -2944,6 +2948,17 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim
29442948
}
29452949
}
29462950

2951+
def isParquetScan(op: SparkPlan): Boolean = {
2952+
op match {
2953+
case scan: FileSourceScanExec =>
2954+
scan.relation.fileFormat.isInstanceOf[ParquetFileFormat]
2955+
case scan: BatchScanExec =>
2956+
scan.scan.isInstanceOf[ParquetScan]
2957+
case _ =>
2958+
false
2959+
}
2960+
}
2961+
29472962
/**
29482963
* Whether the input Spark operator `op` can be considered as a Comet sink, i.e., the start of
29492964
* native execution. If it is true, we'll wrap `op` with `CometScanWrapper` or

0 commit comments

Comments
 (0)