Skip to content
This repository was archived by the owner on Jun 14, 2024. It is now read-only.

Commit ba8a684

Browse files
Support hybrid scans when filtering on nested fields with with index
1 parent 15b7377 commit ba8a684

File tree

4 files changed

+500
-21
lines changed

4 files changed

+500
-21
lines changed

src/main/scala/com/microsoft/hyperspace/index/rules/PlanUtils.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ object PlanUtils {
7474
* contains the given name.
7575
*/
7676
def extractSearchQuery(exp: Expression, name: String): (Expression, Expression) = {
77-
val splits = name.split(".")
77+
val splits = name.split("\\.")
7878
val expFound = exp.find {
7979
case a: AttributeReference if splits.forall(s => a.name.contains(s)) => true
8080
case f: GetStructField if splits.forall(s => f.toString().contains(s)) => true
@@ -119,7 +119,7 @@ object PlanUtils {
119119
* @return A Spark Catalyst [[AttributeReference]] pointing to the field name.
120120
*/
121121
def extractAttributeRef(exp: Expression, name: String): AttributeReference = {
122-
val splits = name.split(".")
122+
val splits = name.split("\\.")
123123
val elem = exp.find {
124124
case a: AttributeReference if splits.contains(a.name) => true
125125
case _ => false
@@ -136,7 +136,7 @@ object PlanUtils {
136136
* @return A Spark SQL [[DataType]] of the given field name.
137137
*/
138138
def extractTypeFromExpression(exp: Expression, name: String): DataType = {
139-
val splits = name.split(".")
139+
val splits = name.split("\\.")
140140
val elem = exp.flatMap {
141141
case a: AttributeReference =>
142142
if (splits.forall(s => a.name == s)) {

src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala

Lines changed: 46 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -280,7 +280,10 @@ object RuleUtils {
280280
Map(IndexConstants.INDEX_RELATION_IDENTIFIER))(spark, index)
281281

282282
val flatSchema =
283-
ResolverUtils.resolve(spark, index.indexedColumns ++ index.includedColumns, relation.plan)
283+
ResolverUtils.resolve(
284+
spark,
285+
index.indexedColumns ++ index.includedColumns,
286+
relation.plan)
284287
// SchemaUtils.escapeFieldNames(SchemaUtils.flatten(relation.plan.schema))
285288
val updatedOutput =
286289
if (flatSchema.isDefined && SchemaUtils.containsNestedFieldNames(flatSchema.get)) {
@@ -294,8 +297,8 @@ object RuleUtils {
294297
}
295298
} else {
296299
relation.plan.output
297-
.filter(attr => indexFsRelation.schema.fieldNames.contains(attr.name))
298-
.map(_.asInstanceOf[AttributeReference])
300+
.filter(attr => indexFsRelation.schema.fieldNames.contains(attr.name))
301+
.map(_.asInstanceOf[AttributeReference])
299302
}
300303
relation.createLogicalRelation(indexFsRelation, updatedOutput)
301304

@@ -328,7 +331,7 @@ object RuleUtils {
328331
useBucketSpec: Boolean,
329332
useBucketUnionForAppended: Boolean): LogicalPlan = {
330333
val provider = Hyperspace.getContext(spark).sourceProviderManager
331-
var unhandledAppendedFiles: Seq[Path] = Nil
334+
var unhandledAppendedFiles = Seq.empty[Path]
332335
// Get transformed plan with index data and appended files if applicable.
333336
val indexPlan = plan transformUp {
334337
// Use transformUp here as currently one relation is allowed (pre-requisite).
@@ -367,7 +370,7 @@ object RuleUtils {
367370

368371
val filesToRead = {
369372
if (useBucketSpec || !index.hasParquetAsSourceFormat || filesDeleted.nonEmpty ||
370-
relation.partitionSchema.nonEmpty) {
373+
relation.partitionSchema.nonEmpty || index.usesNestedFields) {
371374
// Since the index data is in "parquet" format, we cannot read source files
372375
// in formats other than "parquet" using one FileScan node as the operator requires
373376
// files in one homogenous format. To address this, we need to read the appended
@@ -391,10 +394,17 @@ object RuleUtils {
391394
// In order to handle deleted files, read index data with the lineage column so that
392395
// we could inject Filter-Not-In conditions on the lineage column to exclude the indexed
393396
// rows from the deleted files.
397+
val flatSchema = ResolverUtils.resolve(
398+
spark,
399+
SchemaUtils.removePrefixNestedFieldNames(index.indexedColumns ++ index.includedColumns),
400+
relation.plan)
394401
val newSchema = StructType(
395-
index.schema.filter(s =>
396-
relation.plan.schema.contains(s) || (filesDeleted.nonEmpty && s.name.equals(
397-
IndexConstants.DATA_FILE_NAME_ID))))
402+
index.schema.filter(
403+
s =>
404+
(flatSchema.isDefined && SchemaUtils
405+
.prefixNestedFieldNames(flatSchema.get)
406+
.contains(s.name)) ||
407+
(filesDeleted.nonEmpty && s.name.equals(IndexConstants.DATA_FILE_NAME_ID))))
398408

399409
def fileIndex: InMemoryFileIndex = {
400410
new InMemoryFileIndex(spark, filesToRead, Map(), None)
@@ -414,9 +424,22 @@ object RuleUtils {
414424
new ParquetFileFormat,
415425
Map(IndexConstants.INDEX_RELATION_IDENTIFIER))(spark, index)
416426

417-
val updatedOutput = relation.plan.output
418-
.filter(attr => indexFsRelation.schema.fieldNames.contains(attr.name))
419-
.map(_.asInstanceOf[AttributeReference])
427+
val updatedOutput =
428+
if (flatSchema.isDefined && SchemaUtils.containsNestedFieldNames(
429+
SchemaUtils.prefixNestedFieldNames(flatSchema.get))) {
430+
indexFsRelation.schema.flatMap { s =>
431+
val exprId = getFieldPosition(index, s.name)
432+
relation.plan.output.find(a => s.name.contains(a.name)).map { a =>
433+
AttributeReference(s.name, s.dataType, a.nullable, a.metadata)(
434+
ExprId(exprId),
435+
a.qualifier)
436+
}
437+
}
438+
} else {
439+
relation.plan.output
440+
.filter(attr => indexFsRelation.schema.fieldNames.contains(attr.name))
441+
.map(_.asInstanceOf[AttributeReference])
442+
}
420443

421444
if (filesDeleted.isEmpty) {
422445
relation.createLogicalRelation(indexFsRelation, updatedOutput)
@@ -428,6 +451,12 @@ object RuleUtils {
428451
val filterForDeleted = Filter(Not(In(lineageAttr, deletedFileIds)), rel)
429452
Project(updatedOutput, OptimizeIn(filterForDeleted))
430453
}
454+
case p: Project if provider.isSupportedProject(p) =>
455+
transformProject(p, index)
456+
457+
case f: Filter if provider.isSupportedFilter(f) =>
458+
transformFilter(f, index)
459+
431460
}
432461

433462
if (unhandledAppendedFiles.nonEmpty) {
@@ -501,11 +530,14 @@ object RuleUtils {
501530
// Set the same output schema with the index plan to merge them using BucketUnion.
502531
// Include partition columns for data loading.
503532
val partitionColumns = relation.partitionSchema.map(_.name)
504-
val updatedSchema = StructType(relation.plan.schema.filter(col =>
505-
index.schema.contains(col) || relation.partitionSchema.contains(col)))
533+
val updatedSchema = StructType(
534+
relation.plan.schema.filter(col =>
535+
index.schema.fieldNames.exists(n => n.contains(col.name)) ||
536+
relation.partitionSchema.contains(col)))
506537
val updatedOutput = relation.plan.output
507538
.filter(attr =>
508-
index.schema.fieldNames.contains(attr.name) || partitionColumns.contains(attr.name))
539+
index.schema.fieldNames.exists(n => n.contains(attr.name)) ||
540+
partitionColumns.contains(attr.name))
509541
.map(_.asInstanceOf[AttributeReference])
510542
val newRelation = relation.createHadoopFsRelation(
511543
newLocation,

src/main/scala/com/microsoft/hyperspace/index/sources/FileBasedSourceProviderManager.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -131,13 +131,13 @@ class FileBasedSourceProviderManager(spark: SparkSession) {
131131
*/
132132
def isSupportedProject(project: Project): Boolean = {
133133
val containsNestedFields = SchemaUtils.containsNestedFieldNames(
134-
project.projectList.flatMap(extractNamesFromExpression))
134+
SchemaUtils.prefixNestedFieldNames(project.projectList.flatMap(extractNamesFromExpression)))
135135
var containsNestedChildren = false
136136
project.child.foreach {
137137
case f: Filter =>
138138
containsNestedChildren = containsNestedChildren || {
139-
SchemaUtils.containsNestedFieldNames(SchemaUtils.removePrefixNestedFieldNames(
140-
extractNamesFromExpression(f.condition).toSeq))
139+
SchemaUtils.containsNestedFieldNames(
140+
SchemaUtils.prefixNestedFieldNames(extractNamesFromExpression(f.condition).toSeq))
141141
}
142142
case _ =>
143143
}
@@ -153,7 +153,7 @@ class FileBasedSourceProviderManager(spark: SparkSession) {
153153
*/
154154
def isSupportedFilter(filter: Filter): Boolean = {
155155
val containsNestedFields = SchemaUtils.containsNestedFieldNames(
156-
extractNamesFromExpression(filter.condition).toSeq)
156+
SchemaUtils.prefixNestedFieldNames(extractNamesFromExpression(filter.condition).toSeq))
157157
containsNestedFields
158158
}
159159

0 commit comments

Comments
 (0)