Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: Stop performing deep copies when CometScan source is an exchange #1097

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 13 additions & 3 deletions native/core/src/execution/datafusion/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -990,8 +990,13 @@ impl PhysicalPlanner {
};

// The `ScanExec` operator will take actual arrays from Spark during execution
let scan =
ScanExec::new(self.exec_context_id, input_source, &scan.source, data_types)?;
let scan = ScanExec::new(
self.exec_context_id,
input_source,
&scan.source,
data_types,
scan.reuses_buffers,
)?;
Ok((vec![scan.clone()], Arc::new(scan)))
}
OpStruct::ShuffleWriter(writer) => {
Expand Down Expand Up @@ -2026,8 +2031,13 @@ impl From<ExpressionError> for DataFusionError {
fn can_reuse_input_batch(op: &Arc<dyn ExecutionPlan>) -> bool {
if op.as_any().is::<ProjectionExec>() || op.as_any().is::<LocalLimitExec>() {
can_reuse_input_batch(op.children()[0])
} else if op.as_any().is::<ScanExec>() {
op.as_any()
.downcast_ref::<ScanExec>()
.unwrap()
.reuses_buffers
} else {
op.as_any().is::<ScanExec>()
false
}
}

Expand Down
3 changes: 3 additions & 0 deletions native/core/src/execution/operators/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ pub struct ScanExec {
cache: PlanProperties,
/// Metrics collector
metrics: ExecutionPlanMetricsSet,
pub reuses_buffers: bool,
}

impl ScanExec {
Expand All @@ -80,6 +81,7 @@ impl ScanExec {
input_source: Option<Arc<GlobalRef>>,
input_source_description: &str,
data_types: Vec<DataType>,
reuses_buffers: bool,
) -> Result<Self, CometError> {
// Scan's schema is determined by the input batch, so we need to set it before execution.
// Note that we determine if arrays are dictionary-encoded based on the
Expand Down Expand Up @@ -111,6 +113,7 @@ impl ScanExec {
batch: Arc::new(Mutex::new(Some(first_batch))),
cache,
metrics: ExecutionPlanMetricsSet::default(),
reuses_buffers,
})
}

Expand Down
1 change: 1 addition & 0 deletions native/proto/src/proto/operator.proto
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ message Scan {
// is purely for informational purposes when viewing native query plans in
// debug mode.
string source = 2;
bool reuses_buffers = 3;
}

message Projection {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1229,6 +1229,17 @@ object CometSparkSessionExtensions extends Logging {
op.isInstanceOf[CometBatchScanExec] || op.isInstanceOf[CometScanExec]
}

def isParquetScan(op: SparkPlan): Boolean = {
op match {
case scan: FileSourceScanExec =>
scan.relation.fileFormat.isInstanceOf[ParquetFileFormat]
case scan: BatchScanExec =>
scan.scan.isInstanceOf[ParquetScan]
case _ =>
false
}
}

private def shouldApplySparkToColumnar(conf: SQLConf, op: SparkPlan): Boolean = {
// Only consider converting leaf nodes to columnar currently, so that all the following
// operators can have a chance to be converted to columnar. Leaf operators that output
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String

import org.apache.comet.CometConf
import org.apache.comet.CometSparkSessionExtensions.{isCometScan, isSpark34Plus, withInfo}
import org.apache.comet.CometSparkSessionExtensions.{isCometScan, isParquetScan, isSpark34Plus, withInfo}
import org.apache.comet.expressions.{CometCast, CometEvalMode, Compatible, Incompatible, RegExp, Unsupported}
import org.apache.comet.serde.ExprOuterClass.{AggExpr, DataType => ProtoDataType, Expr, ScalarFunc}
import org.apache.comet.serde.ExprOuterClass.DataType.{DataTypeInfo, DecimalInfo, ListInfo, MapInfo, StructInfo}
Expand Down Expand Up @@ -2881,6 +2881,7 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim
// These operators are source of Comet native execution chain
val scanBuilder = OperatorOuterClass.Scan.newBuilder()
scanBuilder.setSource(op.simpleStringWithNodeId())
scanBuilder.setReusesBuffers(isParquetScan(op))

val scanTypes = op.output.flatten { attr =>
serializeDataType(attr.dataType)
Expand Down
Loading