Skip to content

Commit 311bc9e

Browse files
committed
Revert "wip - CometNativeScan (#1076)"
This reverts commit 38e32f7.
1 parent 38e32f7 commit 311bc9e

File tree

9 files changed

+148
-716
lines changed

9 files changed

+148
-716
lines changed

common/src/main/scala/org/apache/comet/CometConf.scala

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -77,16 +77,6 @@ object CometConf extends ShimCometConf {
7777
.booleanConf
7878
.createWithDefault(true)
7979

80-
val COMET_FULL_NATIVE_SCAN_ENABLED: ConfigEntry[Boolean] = conf(
81-
"spark.comet.native.scan.enabled")
82-
.internal()
83-
.doc(
84-
"Whether to enable the fully native scan. When this is turned on, Spark will use Comet to " +
85-
"read supported data sources (currently only Parquet is supported natively)." +
86-
" By default, this config is true.")
87-
.booleanConf
88-
.createWithDefault(true)
89-
9080
val COMET_PARQUET_PARALLEL_IO_ENABLED: ConfigEntry[Boolean] =
9181
conf("spark.comet.parquet.read.parallel.io.enabled")
9282
.doc(

native/core/src/execution/datafusion/planner.rs

Lines changed: 94 additions & 92 deletions
Original file line numberDiff line numberDiff line change
@@ -948,116 +948,118 @@ impl PhysicalPlanner {
948948
Arc::new(SortExec::new(LexOrdering::new(exprs?), child).with_fetch(fetch)),
949949
))
950950
}
951-
OpStruct::NativeScan(scan) => {
951+
OpStruct::Scan(scan) => {
952952
let data_types = scan.fields.iter().map(to_arrow_datatype).collect_vec();
953953

954-
println!("NATIVE: SCAN: {:?}", scan);
955-
let data_schema = parse_message_type(&*scan.data_schema).unwrap();
956-
let required_schema = parse_message_type(&*scan.required_schema).unwrap();
957-
println!("data_schema: {:?}", data_schema);
958-
println!("required_schema: {:?}", required_schema);
959-
960-
let data_schema_descriptor =
961-
parquet::schema::types::SchemaDescriptor::new(Arc::new(data_schema));
962-
let data_schema_arrow = Arc::new(
963-
parquet::arrow::schema::parquet_to_arrow_schema(&data_schema_descriptor, None)
954+
if scan.source == "CometScan parquet (unknown)" {
955+
let data_schema = parse_message_type(&scan.data_schema).unwrap();
956+
let required_schema = parse_message_type(&scan.required_schema).unwrap();
957+
println!("data_schema: {:?}", data_schema);
958+
println!("required_schema: {:?}", required_schema);
959+
960+
let data_schema_descriptor =
961+
parquet::schema::types::SchemaDescriptor::new(Arc::new(data_schema));
962+
let data_schema_arrow = Arc::new(
963+
parquet::arrow::schema::parquet_to_arrow_schema(
964+
&data_schema_descriptor,
965+
None,
966+
)
964967
.unwrap(),
965-
);
966-
println!("data_schema_arrow: {:?}", data_schema_arrow);
967-
968-
let required_schema_descriptor =
969-
parquet::schema::types::SchemaDescriptor::new(Arc::new(required_schema));
970-
let required_schema_arrow = Arc::new(
971-
parquet::arrow::schema::parquet_to_arrow_schema(
972-
&required_schema_descriptor,
973-
None,
974-
)
975-
.unwrap(),
976-
);
977-
println!("required_schema_arrow: {:?}", required_schema_arrow);
978-
979-
assert!(!required_schema_arrow.fields.is_empty());
980-
981-
let mut projection_vector: Vec<usize> =
982-
Vec::with_capacity(required_schema_arrow.fields.len());
983-
// TODO: could be faster with a hashmap rather than iterating over data_schema_arrow with index_of.
984-
required_schema_arrow.fields.iter().for_each(|field| {
985-
projection_vector.push(data_schema_arrow.index_of(field.name()).unwrap());
986-
});
987-
println!("projection_vector: {:?}", projection_vector);
968+
);
969+
println!("data_schema_arrow: {:?}", data_schema_arrow);
970+
971+
let required_schema_descriptor =
972+
parquet::schema::types::SchemaDescriptor::new(Arc::new(required_schema));
973+
let required_schema_arrow = Arc::new(
974+
parquet::arrow::schema::parquet_to_arrow_schema(
975+
&required_schema_descriptor,
976+
None,
977+
)
978+
.unwrap(),
979+
);
980+
println!("required_schema_arrow: {:?}", required_schema_arrow);
988981

989-
assert_eq!(projection_vector.len(), required_schema_arrow.fields.len());
982+
assert!(!required_schema_arrow.fields.is_empty());
990983

991-
// Convert the Spark expressions to Physical expressions
992-
let data_filters: Result<Vec<Arc<dyn PhysicalExpr>>, ExecutionError> = scan
993-
.data_filters
994-
.iter()
995-
.map(|expr| self.create_expr(expr, Arc::clone(&required_schema_arrow)))
996-
.collect();
984+
let mut projection_vector: Vec<usize> =
985+
Vec::with_capacity(required_schema_arrow.fields.len());
986+
// TODO: could be faster with a hashmap rather than iterating over data_schema_arrow with index_of.
987+
required_schema_arrow.fields.iter().for_each(|field| {
988+
projection_vector.push(data_schema_arrow.index_of(field.name()).unwrap());
989+
});
990+
println!("projection_vector: {:?}", projection_vector);
997991

998-
// Create a conjunctive form of the vector because ParquetExecBuilder takes
999-
// a single expression
1000-
let data_filters = data_filters?;
1001-
let test_data_filters = data_filters.clone().into_iter().reduce(|left, right| {
1002-
Arc::new(BinaryExpr::new(
1003-
left,
1004-
datafusion::logical_expr::Operator::And,
1005-
right,
1006-
))
1007-
});
992+
assert_eq!(projection_vector.len(), required_schema_arrow.fields.len());
1008993

1009-
println!("data_filters: {:?}", data_filters);
1010-
println!("test_data_filters: {:?}", test_data_filters);
994+
// Convert the Spark expressions to Physical expressions
995+
let data_filters: Result<Vec<Arc<dyn PhysicalExpr>>, ExecutionError> = scan
996+
.data_filters
997+
.iter()
998+
.map(|expr| self.create_expr(expr, Arc::clone(&required_schema_arrow)))
999+
.collect();
1000+
1001+
// Create a conjunctive form of the vector because ParquetExecBuilder takes
1002+
// a single expression
1003+
let data_filters = data_filters?;
1004+
let test_data_filters =
1005+
data_filters.clone().into_iter().reduce(|left, right| {
1006+
Arc::new(BinaryExpr::new(
1007+
left,
1008+
datafusion::logical_expr::Operator::And,
1009+
right,
1010+
))
1011+
});
1012+
1013+
println!("data_filters: {:?}", data_filters);
1014+
println!("test_data_filters: {:?}", test_data_filters);
1015+
1016+
let object_store_url = ObjectStoreUrl::local_filesystem();
1017+
let paths: Vec<Url> = scan
1018+
.path
1019+
.iter()
1020+
.map(|path| Url::parse(path).unwrap())
1021+
.collect();
10111022

1012-
let object_store_url = ObjectStoreUrl::local_filesystem();
1013-
let paths: Vec<Url> = scan
1014-
.path
1015-
.iter()
1016-
.map(|path| Url::parse(path).unwrap())
1017-
.collect();
1023+
let object_store = object_store::local::LocalFileSystem::new();
1024+
// register the object store with the runtime environment
1025+
let url = Url::try_from("file://").unwrap();
1026+
self.session_ctx
1027+
.runtime_env()
1028+
.register_object_store(&url, Arc::new(object_store));
10181029

1019-
let object_store = object_store::local::LocalFileSystem::new();
1020-
// register the object store with the runtime environment
1021-
let url = Url::try_from("file://").unwrap();
1022-
self.session_ctx
1023-
.runtime_env()
1024-
.register_object_store(&url, Arc::new(object_store));
1030+
let files: Vec<PartitionedFile> = paths
1031+
.iter()
1032+
.map(|path| PartitionedFile::from_path(path.path().to_string()).unwrap())
1033+
.collect();
10251034

1026-
let files: Vec<PartitionedFile> = paths
1027-
.iter()
1028-
.map(|path| PartitionedFile::from_path(path.path().to_string()).unwrap())
1029-
.collect();
1035+
// partition the files
1036+
// TODO really should partition the row groups
10301037

1031-
// partition the files
1032-
// TODO really should partition the row groups
1038+
let mut file_groups = vec![vec![]; partition_count];
1039+
files.iter().enumerate().for_each(|(idx, file)| {
1040+
file_groups[idx % partition_count].push(file.clone());
1041+
});
10331042

1034-
let mut file_groups = vec![vec![]; partition_count];
1035-
files.iter().enumerate().for_each(|(idx, file)| {
1036-
file_groups[idx % partition_count].push(file.clone());
1037-
});
1043+
let file_scan_config =
1044+
FileScanConfig::new(object_store_url, Arc::clone(&data_schema_arrow))
1045+
.with_file_groups(file_groups)
1046+
.with_projection(Some(projection_vector));
10381047

1039-
let file_scan_config =
1040-
FileScanConfig::new(object_store_url, Arc::clone(&data_schema_arrow))
1041-
.with_file_groups(file_groups)
1042-
.with_projection(Some(projection_vector));
1048+
let mut table_parquet_options = TableParquetOptions::new();
1049+
table_parquet_options.global.pushdown_filters = true;
1050+
table_parquet_options.global.reorder_filters = true;
10431051

1044-
let mut table_parquet_options = TableParquetOptions::new();
1045-
table_parquet_options.global.pushdown_filters = true;
1046-
table_parquet_options.global.reorder_filters = true;
1052+
let mut builder = ParquetExecBuilder::new(file_scan_config)
1053+
.with_table_parquet_options(table_parquet_options);
10471054

1048-
let mut builder = ParquetExecBuilder::new(file_scan_config)
1049-
.with_table_parquet_options(table_parquet_options);
1055+
if let Some(filter) = test_data_filters {
1056+
builder = builder.with_predicate(filter);
1057+
}
10501058

1051-
if let Some(filter) = test_data_filters {
1052-
builder = builder.with_predicate(filter);
1059+
let scan = builder.build();
1060+
return Ok((vec![], Arc::new(scan)));
10531061
}
10541062

1055-
let scan = builder.build();
1056-
return Ok((vec![], Arc::new(scan)));
1057-
}
1058-
OpStruct::Scan(scan) => {
1059-
let data_types = scan.fields.iter().map(to_arrow_datatype).collect_vec();
1060-
10611063
// If it is not test execution context for unit test, we should have at least one
10621064
// input source
10631065
if self.exec_context_id != TEST_EXEC_CONTEXT_ID && inputs.is_empty() {

native/proto/src/proto/operator.proto

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,6 @@ message Operator {
4343
SortMergeJoin sort_merge_join = 108;
4444
HashJoin hash_join = 109;
4545
Window window = 110;
46-
NativeScan native_scan = 111;
4746
}
4847
}
4948

@@ -53,14 +52,6 @@ message Scan {
5352
// is purely for informational purposes when viewing native query plans in
5453
// debug mode.
5554
string source = 2;
56-
}
57-
58-
message NativeScan {
59-
repeated spark.spark_expression.DataType fields = 1;
60-
// The source of the scan (e.g. file scan, broadcast exchange, shuffle, etc). This
61-
// is purely for informational purposes when viewing native query plans in
62-
// debug mode.
63-
string source = 2;
6455
repeated string path = 3;
6556
string required_schema = 4;
6657
string data_schema = 5;

spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala

Lines changed: 1 addition & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -189,22 +189,6 @@ class CometSparkSessionExtensions
189189
}
190190

191191
// data source V1
192-
case scanExec @ FileSourceScanExec(
193-
HadoopFsRelation(_, partitionSchema, _, _, _: ParquetFileFormat, _),
194-
_: Seq[_],
195-
requiredSchema,
196-
_,
197-
_,
198-
_,
199-
_,
200-
_,
201-
_)
202-
if CometNativeScanExec.isSchemaSupported(requiredSchema)
203-
&& CometNativeScanExec.isSchemaSupported(partitionSchema)
204-
&& COMET_FULL_NATIVE_SCAN_ENABLED.get =>
205-
logInfo("Comet extension enabled for v1 Scan")
206-
CometNativeScanExec(scanExec, session)
207-
// data source V1
208192
case scanExec @ FileSourceScanExec(
209193
HadoopFsRelation(_, partitionSchema, _, _, _: ParquetFileFormat, _),
210194
_: Seq[_],
@@ -1221,8 +1205,7 @@ object CometSparkSessionExtensions extends Logging {
12211205
}
12221206

12231207
def isCometScan(op: SparkPlan): Boolean = {
1224-
op.isInstanceOf[CometBatchScanExec] || op.isInstanceOf[CometScanExec] ||
1225-
op.isInstanceOf[CometNativeScanExec]
1208+
op.isInstanceOf[CometBatchScanExec] || op.isInstanceOf[CometScanExec]
12261209
}
12271210

12281211
private def shouldApplySparkToColumnar(conf: SQLConf, op: SparkPlan): Boolean = {

0 commit comments

Comments
 (0)