Skip to content
This repository was archived by the owner on Jun 14, 2024. It is now read-only.

Commit a34762b

Browse files
New approach by using resolve method of a plan
1 parent acd3ced commit a34762b

File tree

12 files changed

+371
-459
lines changed

12 files changed

+371
-459
lines changed

src/main/scala/com/microsoft/hyperspace/actions/CreateAction.scala

Lines changed: 6 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,12 @@ package com.microsoft.hyperspace.actions
1919
import scala.util.Try
2020

2121
import org.apache.spark.sql.{DataFrame, SparkSession}
22-
import org.apache.spark.sql.types.StructType
2322

2423
import com.microsoft.hyperspace.{Hyperspace, HyperspaceException}
2524
import com.microsoft.hyperspace.actions.Constants.States.{ACTIVE, CREATING, DOESNOTEXIST}
2625
import com.microsoft.hyperspace.index._
2726
import com.microsoft.hyperspace.telemetry.{AppInfo, CreateActionEvent, HyperspaceEvent}
28-
import com.microsoft.hyperspace.util.{ResolverUtils, SchemaUtils}
27+
import com.microsoft.hyperspace.util.ResolverUtils
2928

3029
class CreateAction(
3130
spark: SparkSession,
@@ -50,7 +49,7 @@ class CreateAction(
5049
}
5150

5251
// schema validity checks
53-
if (!isValidIndexSchema(indexConfig, df.schema)) {
52+
if (!isValidIndexSchema(indexConfig, df)) {
5453
throw HyperspaceException("Index config is not applicable to dataframe schema.")
5554
}
5655

@@ -64,24 +63,13 @@ class CreateAction(
6463
}
6564
}
6665

67-
private def isValidIndexSchema(config: IndexConfig, schema: StructType): Boolean = {
68-
// First we flatten the schema. Instead of having struct of leaves
69-
// the flatten method will return a list of field names.
70-
// The second step is escaping the field names as there are some problems when
71-
// using field names with dots. One is `partitionBy` does not works well
72-
// with field names that contains the `.` (dot). See more on this Apache Spark
73-
// ticket: https://issues.apache.org/jira/browse/SPARK-18084. Other is doing
74-
// encountering `AnalysisException: Cannot resolve column name...` exceptions.
75-
// So, given `struct(nested, struct(nst, struct(field1)))`, the fields variable
76-
// will contain `Seq("nested__nst__field1")`.
77-
val fields = SchemaUtils.escapeFieldNames(SchemaUtils.flatten(schema))
78-
// Resolve index config columns from available column names present in the schema.
66+
private def isValidIndexSchema(config: IndexConfig, dataFrame: DataFrame): Boolean = {
67+
// Resolve index config columns from available column names present in the dataframe.
7968
ResolverUtils
8069
.resolve(
8170
spark,
82-
SchemaUtils.escapeFieldNames(config.indexedColumns)
83-
++ SchemaUtils.escapeFieldNames(config.includedColumns),
84-
fields)
71+
config.indexedColumns ++ config.includedColumns,
72+
dataFrame.queryExecution.analyzed)
8573
.isDefined
8674
}
8775

src/main/scala/com/microsoft/hyperspace/actions/CreateActionBase.scala

Lines changed: 17 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ private[actions] abstract class CreateActionBase(dataManager: IndexDataManager)
103103

104104
// run job
105105
val repartitionedIndexDataFrame =
106-
indexDataFrame.repartition(numBuckets, resolvedIndexedColumns.map(c => col(s"$c")): _*)
106+
indexDataFrame.repartition(numBuckets, resolvedIndexedColumns.map(c => col(s"`$c`")): _*)
107107

108108
// Save the index with the number of buckets specified.
109109
repartitionedIndexDataFrame.write
@@ -144,7 +144,8 @@ private[actions] abstract class CreateActionBase(dataManager: IndexDataManager)
144144
}
145145

146146
private def usesNestedFieldsProperty(indexConfig: IndexConfig): Option[(String, String)] = {
147-
if (SchemaUtils.hasNestedFields(indexConfig.indexedColumns ++ indexConfig.includedColumns)) {
147+
if (SchemaUtils.containsNestedFieldNames(indexConfig.indexedColumns ++
148+
indexConfig.includedColumns)) {
148149
Some(IndexConstants.USES_NESTED_FIELDS_PROPERTY -> "true")
149150
} else {
150151
None
@@ -155,26 +156,22 @@ private[actions] abstract class CreateActionBase(dataManager: IndexDataManager)
155156
df: DataFrame,
156157
indexConfig: IndexConfig): (Seq[String], Seq[String]) = {
157158
val spark = df.sparkSession
158-
// Flatten will transform nested field names from `struct(nested, struct(nst, struct(field1)))`
159-
// into `Seq("nested.nst.field1")`.
160-
val dfColumnNames = SchemaUtils.flatten(df.schema)
161-
// The index config will contain the field names as they are given with `.` (dots).
162-
// (ie: `Seq("nested.nst.field")`) and they need to be escaped to `nested__nst__field1` as
163-
// the index entry know to work with escaped values only.
164-
val indexedColumns = SchemaUtils.unescapeFieldNames(indexConfig.indexedColumns)
165-
val includedColumns = SchemaUtils.unescapeFieldNames(indexConfig.includedColumns)
166-
val resolvedIndexedColumns = ResolverUtils.resolve(spark, indexedColumns, dfColumnNames)
167-
val resolvedIncludedColumns = ResolverUtils.resolve(spark, includedColumns, dfColumnNames)
159+
val plan = df.queryExecution.analyzed
160+
val indexedColumns = indexConfig.indexedColumns
161+
val includedColumns = indexConfig.includedColumns
162+
val resolvedIndexedColumns = ResolverUtils.resolve(spark, indexedColumns, plan)
163+
val resolvedIncludedColumns = ResolverUtils.resolve(spark, includedColumns, plan)
168164

169165
(resolvedIndexedColumns, resolvedIncludedColumns) match {
170-
case (Some(indexed), Some(included)) => (indexed, included)
166+
case (Some(indexed), Some(included)) =>
167+
(indexed, included)
171168
case _ =>
172169
val unresolvedColumns = (indexedColumns ++ includedColumns)
173-
.map(c => (c, ResolverUtils.resolve(spark, c, dfColumnNames)))
170+
.map(c => (c, ResolverUtils.resolve(spark, Seq(c), plan)))
174171
.collect { case c if c._2.isEmpty => c._1 }
175172
throw HyperspaceException(
176173
s"Columns '${unresolvedColumns.mkString(",")}' could not be resolved " +
177-
s"from available source columns '${dfColumnNames.mkString(",")}'")
174+
s"from available source columns:\n${df.schema.treeString}")
178175
}
179176
}
180177

@@ -218,15 +215,15 @@ private[actions] abstract class CreateActionBase(dataManager: IndexDataManager)
218215
.select(
219216
allIndexColumns.head,
220217
allIndexColumns.tail :+ IndexConstants.DATA_FILE_NAME_ID: _*)
221-
.toDF(
222-
SchemaUtils.escapeFieldNames(allIndexColumns) :+ IndexConstants.DATA_FILE_NAME_ID: _*)
218+
.toDF(SchemaUtils.prefixNestedFieldNames(allIndexColumns) :+
219+
IndexConstants.DATA_FILE_NAME_ID: _*)
223220
} else {
224221
df.select(columnsFromIndexConfig.head, columnsFromIndexConfig.tail: _*)
225-
.toDF(SchemaUtils.escapeFieldNames(columnsFromIndexConfig): _*)
222+
.toDF(SchemaUtils.prefixNestedFieldNames(columnsFromIndexConfig): _*)
226223
}
227224

228-
val escapedIndexedColumns = SchemaUtils.escapeFieldNames(resolvedIndexedColumns)
229-
val escapedIncludedColumns = SchemaUtils.escapeFieldNames(resolvedIncludedColumns)
225+
val escapedIndexedColumns = SchemaUtils.prefixNestedFieldNames(resolvedIndexedColumns)
226+
val escapedIncludedColumns = SchemaUtils.prefixNestedFieldNames(resolvedIncludedColumns)
230227

231228
(indexDF, escapedIndexedColumns, escapedIncludedColumns)
232229
}

src/main/scala/com/microsoft/hyperspace/actions/RefreshActionBase.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import org.apache.spark.sql.types.{DataType, StructType}
2222
import com.microsoft.hyperspace.{Hyperspace, HyperspaceException}
2323
import com.microsoft.hyperspace.actions.Constants.States.{ACTIVE, REFRESHING}
2424
import com.microsoft.hyperspace.index._
25+
import com.microsoft.hyperspace.util.SchemaUtils
2526

2627
/**
2728
* Base abstract class containing common code for different types of index refresh actions.
@@ -86,7 +87,10 @@ private[actions] abstract class RefreshActionBase(
8687

8788
protected lazy val indexConfig: IndexConfig = {
8889
val ddColumns = previousIndexLogEntry.derivedDataset.properties.columns
89-
IndexConfig(previousIndexLogEntry.name, ddColumns.indexed, ddColumns.included)
90+
IndexConfig(
91+
previousIndexLogEntry.name,
92+
SchemaUtils.removePrefixNestedFieldNames(ddColumns.indexed),
93+
SchemaUtils.removePrefixNestedFieldNames(ddColumns.included))
9094
}
9195

9296
final override val transientState: String = REFRESHING

src/main/scala/com/microsoft/hyperspace/actions/RefreshIncrementalAction.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import com.microsoft.hyperspace.{Hyperspace, HyperspaceException}
2323
import com.microsoft.hyperspace.index._
2424
import com.microsoft.hyperspace.index.DataFrameWriterExtensions.Bucketizer
2525
import com.microsoft.hyperspace.telemetry.{AppInfo, HyperspaceEvent, RefreshIncrementalActionEvent}
26+
import com.microsoft.hyperspace.util.SchemaUtils
2627

2728
/**
2829
* Action to refresh indexes with newly appended files and deleted files in an incremental way.
@@ -90,7 +91,7 @@ class RefreshIncrementalAction(
9091
refreshDF,
9192
indexDataPath.toString,
9293
previousIndexLogEntry.numBuckets,
93-
indexConfig.indexedColumns,
94+
SchemaUtils.prefixNestedFieldNames(indexConfig.indexedColumns),
9495
writeMode)
9596
}
9697
}

src/main/scala/com/microsoft/hyperspace/index/rules/JoinIndexRule.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -305,8 +305,8 @@ object JoinIndexRule
305305
val rRequiredIndexedCols = lRMap.values.toSeq
306306

307307
// All required columns resolved with base relation.
308-
val lRequiredAllCols = resolve(spark, allRequiredCols(left), lBaseAttrs).get
309-
val rRequiredAllCols = resolve(spark, allRequiredCols(right), rBaseAttrs).get
308+
val lRequiredAllCols = resolve(spark, allRequiredCols(left), leftRelation.plan).get
309+
val rRequiredAllCols = resolve(spark, allRequiredCols(right), rightRelation.plan).get
310310

311311
// Make sure required indexed columns are subset of all required columns for a subplan
312312
require(resolve(spark, lRequiredIndexedCols, lRequiredAllCols).isDefined)

src/main/scala/com/microsoft/hyperspace/util/ResolverUtils.scala

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,11 @@ package com.microsoft.hyperspace.util
1818

1919
import org.apache.spark.sql.SparkSession
2020
import org.apache.spark.sql.catalyst.analysis.Resolver
21+
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, GetStructField}
22+
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
23+
import org.apache.spark.sql.types.StructType
24+
25+
import com.microsoft.hyperspace.HyperspaceException
2126

2227
/**
2328
* [[ResolverUtils]] provides utility functions to resolve strings based on spark's resolver.
@@ -71,4 +76,68 @@ object ResolverUtils {
7176
availableStrings: Seq[String]): Option[Seq[String]] = {
7277
Some(requiredStrings.map(resolve(spark, _, availableStrings).getOrElse { return None }))
7378
}
79+
80+
/**
81+
* Finds all resolved strings for requiredStrings, from the given logical plan. Returns
82+
* optional seq of resolved strings if all required strings are resolved, otherwise None.
83+
*
84+
* @param spark Spark session.
85+
* @param requiredStrings List of strings to resolve.
86+
* @param plan Logical plan to resolve against.
87+
* @return Optional Seq of resolved strings if all required strings are resolved. Else, None.
88+
*/
89+
def resolve(
90+
spark: SparkSession,
91+
requiredStrings: Seq[String],
92+
plan: LogicalPlan): Option[Seq[String]] = {
93+
94+
def fixParts(parts: Seq[String], outputs: Seq[Attribute]): Seq[String] = {
95+
var newParts = Seq.empty[String]
96+
val h :: t = parts.toList
97+
val topLevelField = outputs.find(_.name.compareToIgnoreCase(h) == 0)
98+
topLevelField match {
99+
case Some(o) =>
100+
newParts = newParts :+ o.name
101+
var children = o.dataType.asInstanceOf[StructType]
102+
t.foreach { e =>
103+
val elIdx: Int = children.fieldNames.indexWhere { f => f.compareToIgnoreCase(e) == 0 }
104+
if (elIdx >= 0) {
105+
newParts = newParts :+ children.fieldNames(elIdx)
106+
children.fields.toSeq(elIdx).dataType match {
107+
case s: StructType =>
108+
children = s
109+
case _ =>
110+
}
111+
}
112+
}
113+
case None =>
114+
newParts = h +: t
115+
}
116+
newParts
117+
}
118+
119+
val schemaFieldNames = plan.output.map(_.name)
120+
Some(requiredStrings.map { requiredField =>
121+
plan
122+
.resolveQuoted(requiredField, spark.sessionState.conf.resolver)
123+
.getOrElse { return None }
124+
.collectFirst {
125+
case a: AttributeReference =>
126+
schemaFieldNames.find(_.compareToIgnoreCase(a.name) == 0).getOrElse(a.name)
127+
case g: GetStructField =>
128+
val parts = g.sql // returns backtick enclosed string (ie: `a`.`b`.`c`)
129+
.replaceAll("^`(.*)`$", "$1") // strip first and last backticks
130+
.split("`\\.`") // split by "`.`"
131+
val fixedParts = fixParts(parts, plan.output) // fix the casing
132+
val resolvedFieldName = fixedParts.mkString(".") // put back the field name fixed
133+
val unsupported = parts.filter(_.contains("."))
134+
if (unsupported.nonEmpty) { // the fields with dots should throw exception
135+
throw HyperspaceException(s"The following field name construct " +
136+
s"$resolvedFieldName contains unsupported parts: ${unsupported.mkString(",")}.")
137+
}
138+
resolvedFieldName
139+
}
140+
.getOrElse { return None }
141+
})
142+
}
74143
}

0 commit comments

Comments
 (0)