Skip to content

Commit 38e32f7

Browse files
authored
wip - CometNativeScan (#1076)
* wip - CometNativeScan * fix and make config internal
1 parent ad46821 commit 38e32f7

File tree

9 files changed

+716
-148
lines changed

9 files changed

+716
-148
lines changed

common/src/main/scala/org/apache/comet/CometConf.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,16 @@ object CometConf extends ShimCometConf {
7777
.booleanConf
7878
.createWithDefault(true)
7979

80+
val COMET_FULL_NATIVE_SCAN_ENABLED: ConfigEntry[Boolean] = conf(
81+
"spark.comet.native.scan.enabled")
82+
.internal()
83+
.doc(
84+
"Whether to enable the fully native scan. When this is turned on, Spark will use Comet to " +
85+
"read supported data sources (currently only Parquet is supported natively)." +
86+
" By default, this config is true.")
87+
.booleanConf
88+
.createWithDefault(true)
89+
8090
val COMET_PARQUET_PARALLEL_IO_ENABLED: ConfigEntry[Boolean] =
8191
conf("spark.comet.parquet.read.parallel.io.enabled")
8292
.doc(

native/core/src/execution/datafusion/planner.rs

Lines changed: 92 additions & 94 deletions
Original file line numberDiff line numberDiff line change
@@ -948,118 +948,116 @@ impl PhysicalPlanner {
948948
Arc::new(SortExec::new(LexOrdering::new(exprs?), child).with_fetch(fetch)),
949949
))
950950
}
951-
OpStruct::Scan(scan) => {
951+
OpStruct::NativeScan(scan) => {
952952
let data_types = scan.fields.iter().map(to_arrow_datatype).collect_vec();
953953

954-
if scan.source == "CometScan parquet (unknown)" {
955-
let data_schema = parse_message_type(&scan.data_schema).unwrap();
956-
let required_schema = parse_message_type(&scan.required_schema).unwrap();
957-
println!("data_schema: {:?}", data_schema);
958-
println!("required_schema: {:?}", required_schema);
959-
960-
let data_schema_descriptor =
961-
parquet::schema::types::SchemaDescriptor::new(Arc::new(data_schema));
962-
let data_schema_arrow = Arc::new(
963-
parquet::arrow::schema::parquet_to_arrow_schema(
964-
&data_schema_descriptor,
965-
None,
966-
)
967-
.unwrap(),
968-
);
969-
println!("data_schema_arrow: {:?}", data_schema_arrow);
970-
971-
let required_schema_descriptor =
972-
parquet::schema::types::SchemaDescriptor::new(Arc::new(required_schema));
973-
let required_schema_arrow = Arc::new(
974-
parquet::arrow::schema::parquet_to_arrow_schema(
975-
&required_schema_descriptor,
976-
None,
977-
)
954+
println!("NATIVE: SCAN: {:?}", scan);
955+
let data_schema = parse_message_type(&*scan.data_schema).unwrap();
956+
let required_schema = parse_message_type(&*scan.required_schema).unwrap();
957+
println!("data_schema: {:?}", data_schema);
958+
println!("required_schema: {:?}", required_schema);
959+
960+
let data_schema_descriptor =
961+
parquet::schema::types::SchemaDescriptor::new(Arc::new(data_schema));
962+
let data_schema_arrow = Arc::new(
963+
parquet::arrow::schema::parquet_to_arrow_schema(&data_schema_descriptor, None)
978964
.unwrap(),
979-
);
980-
println!("required_schema_arrow: {:?}", required_schema_arrow);
965+
);
966+
println!("data_schema_arrow: {:?}", data_schema_arrow);
967+
968+
let required_schema_descriptor =
969+
parquet::schema::types::SchemaDescriptor::new(Arc::new(required_schema));
970+
let required_schema_arrow = Arc::new(
971+
parquet::arrow::schema::parquet_to_arrow_schema(
972+
&required_schema_descriptor,
973+
None,
974+
)
975+
.unwrap(),
976+
);
977+
println!("required_schema_arrow: {:?}", required_schema_arrow);
981978

982-
assert!(!required_schema_arrow.fields.is_empty());
979+
assert!(!required_schema_arrow.fields.is_empty());
983980

984-
let mut projection_vector: Vec<usize> =
985-
Vec::with_capacity(required_schema_arrow.fields.len());
986-
// TODO: could be faster with a hashmap rather than iterating over data_schema_arrow with index_of.
987-
required_schema_arrow.fields.iter().for_each(|field| {
988-
projection_vector.push(data_schema_arrow.index_of(field.name()).unwrap());
989-
});
990-
println!("projection_vector: {:?}", projection_vector);
981+
let mut projection_vector: Vec<usize> =
982+
Vec::with_capacity(required_schema_arrow.fields.len());
983+
// TODO: could be faster with a hashmap rather than iterating over data_schema_arrow with index_of.
984+
required_schema_arrow.fields.iter().for_each(|field| {
985+
projection_vector.push(data_schema_arrow.index_of(field.name()).unwrap());
986+
});
987+
println!("projection_vector: {:?}", projection_vector);
991988

992-
assert_eq!(projection_vector.len(), required_schema_arrow.fields.len());
989+
assert_eq!(projection_vector.len(), required_schema_arrow.fields.len());
993990

994-
// Convert the Spark expressions to Physical expressions
995-
let data_filters: Result<Vec<Arc<dyn PhysicalExpr>>, ExecutionError> = scan
996-
.data_filters
997-
.iter()
998-
.map(|expr| self.create_expr(expr, Arc::clone(&required_schema_arrow)))
999-
.collect();
1000-
1001-
// Create a conjunctive form of the vector because ParquetExecBuilder takes
1002-
// a single expression
1003-
let data_filters = data_filters?;
1004-
let test_data_filters =
1005-
data_filters.clone().into_iter().reduce(|left, right| {
1006-
Arc::new(BinaryExpr::new(
1007-
left,
1008-
datafusion::logical_expr::Operator::And,
1009-
right,
1010-
))
1011-
});
1012-
1013-
println!("data_filters: {:?}", data_filters);
1014-
println!("test_data_filters: {:?}", test_data_filters);
1015-
1016-
let object_store_url = ObjectStoreUrl::local_filesystem();
1017-
let paths: Vec<Url> = scan
1018-
.path
1019-
.iter()
1020-
.map(|path| Url::parse(path).unwrap())
1021-
.collect();
991+
// Convert the Spark expressions to Physical expressions
992+
let data_filters: Result<Vec<Arc<dyn PhysicalExpr>>, ExecutionError> = scan
993+
.data_filters
994+
.iter()
995+
.map(|expr| self.create_expr(expr, Arc::clone(&required_schema_arrow)))
996+
.collect();
1022997

1023-
let object_store = object_store::local::LocalFileSystem::new();
1024-
// register the object store with the runtime environment
1025-
let url = Url::try_from("file://").unwrap();
1026-
self.session_ctx
1027-
.runtime_env()
1028-
.register_object_store(&url, Arc::new(object_store));
998+
// Create a conjunctive form of the vector because ParquetExecBuilder takes
999+
// a single expression
1000+
let data_filters = data_filters?;
1001+
let test_data_filters = data_filters.clone().into_iter().reduce(|left, right| {
1002+
Arc::new(BinaryExpr::new(
1003+
left,
1004+
datafusion::logical_expr::Operator::And,
1005+
right,
1006+
))
1007+
});
10291008

1030-
let files: Vec<PartitionedFile> = paths
1031-
.iter()
1032-
.map(|path| PartitionedFile::from_path(path.path().to_string()).unwrap())
1033-
.collect();
1009+
println!("data_filters: {:?}", data_filters);
1010+
println!("test_data_filters: {:?}", test_data_filters);
10341011

1035-
// partition the files
1036-
// TODO really should partition the row groups
1012+
let object_store_url = ObjectStoreUrl::local_filesystem();
1013+
let paths: Vec<Url> = scan
1014+
.path
1015+
.iter()
1016+
.map(|path| Url::parse(path).unwrap())
1017+
.collect();
10371018

1038-
let mut file_groups = vec![vec![]; partition_count];
1039-
files.iter().enumerate().for_each(|(idx, file)| {
1040-
file_groups[idx % partition_count].push(file.clone());
1041-
});
1019+
let object_store = object_store::local::LocalFileSystem::new();
1020+
// register the object store with the runtime environment
1021+
let url = Url::try_from("file://").unwrap();
1022+
self.session_ctx
1023+
.runtime_env()
1024+
.register_object_store(&url, Arc::new(object_store));
10421025

1043-
let file_scan_config =
1044-
FileScanConfig::new(object_store_url, Arc::clone(&data_schema_arrow))
1045-
.with_file_groups(file_groups)
1046-
.with_projection(Some(projection_vector));
1026+
let files: Vec<PartitionedFile> = paths
1027+
.iter()
1028+
.map(|path| PartitionedFile::from_path(path.path().to_string()).unwrap())
1029+
.collect();
10471030

1048-
let mut table_parquet_options = TableParquetOptions::new();
1049-
table_parquet_options.global.pushdown_filters = true;
1050-
table_parquet_options.global.reorder_filters = true;
1031+
// partition the files
1032+
// TODO really should partition the row groups
10511033

1052-
let mut builder = ParquetExecBuilder::new(file_scan_config)
1053-
.with_table_parquet_options(table_parquet_options);
1034+
let mut file_groups = vec![vec![]; partition_count];
1035+
files.iter().enumerate().for_each(|(idx, file)| {
1036+
file_groups[idx % partition_count].push(file.clone());
1037+
});
10541038

1055-
if let Some(filter) = test_data_filters {
1056-
builder = builder.with_predicate(filter);
1057-
}
1039+
let file_scan_config =
1040+
FileScanConfig::new(object_store_url, Arc::clone(&data_schema_arrow))
1041+
.with_file_groups(file_groups)
1042+
.with_projection(Some(projection_vector));
10581043

1059-
let scan = builder.build();
1060-
return Ok((vec![], Arc::new(scan)));
1044+
let mut table_parquet_options = TableParquetOptions::new();
1045+
table_parquet_options.global.pushdown_filters = true;
1046+
table_parquet_options.global.reorder_filters = true;
1047+
1048+
let mut builder = ParquetExecBuilder::new(file_scan_config)
1049+
.with_table_parquet_options(table_parquet_options);
1050+
1051+
if let Some(filter) = test_data_filters {
1052+
builder = builder.with_predicate(filter);
10611053
}
10621054

1055+
let scan = builder.build();
1056+
return Ok((vec![], Arc::new(scan)));
1057+
}
1058+
OpStruct::Scan(scan) => {
1059+
let data_types = scan.fields.iter().map(to_arrow_datatype).collect_vec();
1060+
10631061
// If it is not test execution context for unit test, we should have at least one
10641062
// input source
10651063
if self.exec_context_id != TEST_EXEC_CONTEXT_ID && inputs.is_empty() {

native/proto/src/proto/operator.proto

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ message Operator {
4343
SortMergeJoin sort_merge_join = 108;
4444
HashJoin hash_join = 109;
4545
Window window = 110;
46+
NativeScan native_scan = 111;
4647
}
4748
}
4849

@@ -52,6 +53,14 @@ message Scan {
5253
// is purely for informational purposes when viewing native query plans in
5354
// debug mode.
5455
string source = 2;
56+
}
57+
58+
message NativeScan {
59+
repeated spark.spark_expression.DataType fields = 1;
60+
// The source of the scan (e.g. file scan, broadcast exchange, shuffle, etc). This
61+
// is purely for informational purposes when viewing native query plans in
62+
// debug mode.
63+
string source = 2;
5564
repeated string path = 3;
5665
string required_schema = 4;
5766
string data_schema = 5;

spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,22 @@ class CometSparkSessionExtensions
189189
}
190190

191191
// data source V1
192+
case scanExec @ FileSourceScanExec(
193+
HadoopFsRelation(_, partitionSchema, _, _, _: ParquetFileFormat, _),
194+
_: Seq[_],
195+
requiredSchema,
196+
_,
197+
_,
198+
_,
199+
_,
200+
_,
201+
_)
202+
if CometNativeScanExec.isSchemaSupported(requiredSchema)
203+
&& CometNativeScanExec.isSchemaSupported(partitionSchema)
204+
&& COMET_FULL_NATIVE_SCAN_ENABLED.get =>
205+
logInfo("Comet extension enabled for v1 Scan")
206+
CometNativeScanExec(scanExec, session)
207+
// data source V1
192208
case scanExec @ FileSourceScanExec(
193209
HadoopFsRelation(_, partitionSchema, _, _, _: ParquetFileFormat, _),
194210
_: Seq[_],
@@ -1205,7 +1221,8 @@ object CometSparkSessionExtensions extends Logging {
12051221
}
12061222

12071223
def isCometScan(op: SparkPlan): Boolean = {
1208-
op.isInstanceOf[CometBatchScanExec] || op.isInstanceOf[CometScanExec]
1224+
op.isInstanceOf[CometBatchScanExec] || op.isInstanceOf[CometScanExec] ||
1225+
op.isInstanceOf[CometNativeScanExec]
12091226
}
12101227

12111228
private def shouldApplySparkToColumnar(conf: SQLConf, op: SparkPlan): Boolean = {

0 commit comments

Comments
 (0)