Skip to content
This repository was archived by the owner on Jun 14, 2024. It is now read-only.

Commit 7900f1b

Browse files
Resolve to return the nested state along with field name
1 parent 4f8e392 commit 7900f1b

File tree

10 files changed

+140
-104
lines changed

10 files changed

+140
-104
lines changed

src/main/scala/com/microsoft/hyperspace/actions/CreateActionBase.scala

Lines changed: 40 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
package com.microsoft.hyperspace.actions
1818

1919
import org.apache.hadoop.fs.Path
20-
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
20+
import org.apache.spark.sql.{Column, DataFrame, SaveMode, SparkSession}
2121
import org.apache.spark.sql.catalyst.plans.logical.LeafNode
2222
import org.apache.spark.sql.functions.{col, input_file_name}
2323

@@ -104,23 +104,6 @@ private[actions] abstract class CreateActionBase(dataManager: IndexDataManager)
104104
}
105105
}
106106

107-
private def hasParquetAsSourceFormatProperty(
108-
relation: FileBasedRelation): Option[(String, String)] = {
109-
if (relation.hasParquetAsSourceFormat) {
110-
Some(IndexConstants.HAS_PARQUET_AS_SOURCE_FORMAT_PROPERTY -> "true")
111-
} else {
112-
None
113-
}
114-
}
115-
116-
private def hasLineageProperty(spark: SparkSession): Option[(String, String)] = {
117-
if (hasLineage(spark)) {
118-
Some(IndexConstants.LINEAGE_PROPERTY -> "true")
119-
} else {
120-
None
121-
}
122-
}
123-
124107
protected def write(spark: SparkSession, df: DataFrame, indexConfig: IndexConfig): Unit = {
125108
val numBuckets = numBucketsForIndex(spark)
126109

@@ -156,9 +139,26 @@ private[actions] abstract class CreateActionBase(dataManager: IndexDataManager)
156139
relations.head
157140
}
158141

142+
private def hasParquetAsSourceFormatProperty(
143+
relation: FileBasedRelation): Option[(String, String)] = {
144+
if (relation.hasParquetAsSourceFormat) {
145+
Some(IndexConstants.HAS_PARQUET_AS_SOURCE_FORMAT_PROPERTY -> "true")
146+
} else {
147+
None
148+
}
149+
}
150+
151+
private def hasLineageProperty(spark: SparkSession): Option[(String, String)] = {
152+
if (hasLineage(spark)) {
153+
Some(IndexConstants.LINEAGE_PROPERTY -> "true")
154+
} else {
155+
None
156+
}
157+
}
158+
159159
private def resolveConfig(
160160
df: DataFrame,
161-
indexConfig: IndexConfig): (Seq[String], Seq[String]) = {
161+
indexConfig: IndexConfig): (Seq[(String, Boolean)], Seq[(String, Boolean)]) = {
162162
val spark = df.sparkSession
163163
val plan = df.queryExecution.analyzed
164164
val indexedColumns = indexConfig.indexedColumns
@@ -170,8 +170,8 @@ private[actions] abstract class CreateActionBase(dataManager: IndexDataManager)
170170
case (Some(indexed), Some(included)) => (indexed, included)
171171
case _ =>
172172
val unresolvedColumns = (indexedColumns ++ includedColumns)
173-
.map(c => (c, ResolverUtils.resolve(spark, Seq(c), plan)))
174-
.collect { case c if c._2.isEmpty => c._1 }
173+
.map(c => (c, ResolverUtils.resolve(spark, Seq(c), plan).map(_.map(_._1))))
174+
.collect { case (c, r) if r.isEmpty => c }
175175
throw HyperspaceException(
176176
s"Columns '${unresolvedColumns.mkString(",")}' could not be resolved " +
177177
s"from available source columns:\n${df.schema.treeString}")
@@ -183,10 +183,12 @@ private[actions] abstract class CreateActionBase(dataManager: IndexDataManager)
183183
df: DataFrame,
184184
indexConfig: IndexConfig): (DataFrame, Seq[String], Seq[String]) = {
185185
val (resolvedIndexedColumns, resolvedIncludedColumns) = resolveConfig(df, indexConfig)
186-
val columnsFromIndexConfig = resolvedIndexedColumns ++ resolvedIncludedColumns
186+
val columnsFromIndexConfig =
187+
resolvedIndexedColumns.map(_._1) ++ resolvedIncludedColumns.map(_._1)
187188

188-
val escapedIndexedColumns = SchemaUtils.prefixNestedFieldNames(resolvedIndexedColumns)
189-
val escapedIncludedColumns = SchemaUtils.prefixNestedFieldNames(resolvedIncludedColumns)
189+
val prefixedIndexedColumns = SchemaUtils.prefixNestedFieldNames(resolvedIndexedColumns)
190+
val prefixedIncludedColumns = SchemaUtils.prefixNestedFieldNames(resolvedIncludedColumns)
191+
val prefixedColumnsFromIndexConfig = prefixedIndexedColumns ++ prefixedIncludedColumns
190192

191193
val indexDF = if (hasLineage(spark)) {
192194
val relation = getRelation(spark, df)
@@ -215,19 +217,25 @@ private[actions] abstract class CreateActionBase(dataManager: IndexDataManager)
215217
val dataPathColumn = "_data_path"
216218
val lineagePairs = relation.lineagePairs(fileIdTracker)
217219
val lineageDF = lineagePairs.toDF(dataPathColumn, IndexConstants.DATA_FILE_NAME_ID)
220+
val prefixedAllIndexColumns = prefixedColumnsFromIndexConfig ++ missingPartitionColumns
218221

219222
df.withColumn(dataPathColumn, input_file_name())
220223
.join(lineageDF.hint("broadcast"), dataPathColumn)
221-
.select(
222-
allIndexColumns.head,
223-
allIndexColumns.tail :+ IndexConstants.DATA_FILE_NAME_ID: _*)
224-
.toDF(escapedIndexedColumns ++ escapedIncludedColumns ++ missingPartitionColumns :+
225-
IndexConstants.DATA_FILE_NAME_ID: _*)
224+
.select(prepareColumns(allIndexColumns, prefixedAllIndexColumns) :+
225+
col(IndexConstants.DATA_FILE_NAME_ID): _*)
226226
} else {
227-
df.select(columnsFromIndexConfig.head, columnsFromIndexConfig.tail: _*)
228-
.toDF(escapedIndexedColumns ++ escapedIncludedColumns : _*)
227+
df.select(prepareColumns(columnsFromIndexConfig, prefixedColumnsFromIndexConfig): _*)
229228
}
230229

231-
(indexDF, escapedIndexedColumns, escapedIncludedColumns)
230+
(indexDF, prefixedIndexedColumns, prefixedIncludedColumns)
231+
}
232+
233+
private def prepareColumns(
234+
originalColumns: Seq[String],
235+
prefixedColumns: Seq[String]): Seq[Column] = {
236+
originalColumns.zip(prefixedColumns).map {
237+
case (original, prefixed) =>
238+
col(original).as(prefixed)
239+
}
232240
}
233241
}

src/main/scala/com/microsoft/hyperspace/actions/RefreshActionBase.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -93,8 +93,10 @@ private[actions] abstract class RefreshActionBase(
9393
val ddColumns = previousIndexLogEntry.derivedDataset.properties.columns
9494
IndexConfig(
9595
previousIndexLogEntry.name,
96-
SchemaUtils.removePrefixNestedFieldNames(ddColumns.indexed),
97-
SchemaUtils.removePrefixNestedFieldNames(ddColumns.included))
96+
// As indexed & included columns in previousLogEntry are resolved & prefixed names,
97+
// need to remove the prefix to resolve with the dataframe for refresh.
98+
SchemaUtils.removePrefixNestedFieldNames(ddColumns.indexed).map(_._1),
99+
SchemaUtils.removePrefixNestedFieldNames(ddColumns.included).map(_._1))
98100
}
99101

100102
final override val transientState: String = REFRESHING

src/main/scala/com/microsoft/hyperspace/actions/RefreshIncrementalAction.scala

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ import com.microsoft.hyperspace.{Hyperspace, HyperspaceException}
2323
import com.microsoft.hyperspace.index._
2424
import com.microsoft.hyperspace.index.DataFrameWriterExtensions.Bucketizer
2525
import com.microsoft.hyperspace.telemetry.{AppInfo, HyperspaceEvent, RefreshIncrementalActionEvent}
26-
import com.microsoft.hyperspace.util.SchemaUtils
2726

2827
/**
2928
* Action to refresh indexes with newly appended files and deleted files in an incremental way.
@@ -92,7 +91,8 @@ class RefreshIncrementalAction(
9291
refreshDF,
9392
indexDataPath.toString,
9493
previousIndexLogEntry.numBuckets,
95-
SchemaUtils.prefixNestedFieldNames(indexConfig.indexedColumns),
94+
// previousIndexLogEntry should contain the resolved and prefixed field names.
95+
previousIndexLogEntry.derivedDataset.properties.columns.indexed,
9696
writeMode)
9797
}
9898
}
@@ -116,10 +116,6 @@ class RefreshIncrementalAction(
116116
}
117117
}
118118

119-
override protected def event(appInfo: AppInfo, message: String): HyperspaceEvent = {
120-
RefreshIncrementalActionEvent(appInfo, logEntry.asInstanceOf[IndexLogEntry], message)
121-
}
122-
123119
/**
124120
* Create a log entry with all source data files, and all required index content. This contains
125121
* ALL source data files (files which were indexed previously, and files which are being indexed
@@ -144,4 +140,8 @@ class RefreshIncrementalAction(
144140
entry
145141
}
146142
}
143+
144+
override protected def event(appInfo: AppInfo, message: String): HyperspaceEvent = {
145+
RefreshIncrementalActionEvent(appInfo, logEntry.asInstanceOf[IndexLogEntry], message)
146+
}
147147
}

src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -557,10 +557,6 @@ case class IndexLogEntry(
557557
config.hashCode + signature.hashCode + numBuckets.hashCode + content.hashCode
558558
}
559559

560-
def usesNestedFields: Boolean = {
561-
SchemaUtils.containsNestedFieldNames(indexedColumns ++ includedColumns)
562-
}
563-
564560
/**
565561
* A mutable map for holding auxiliary information of this index log entry while applying rules.
566562
*/

src/main/scala/com/microsoft/hyperspace/index/rules/JoinIndexRule.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -305,8 +305,8 @@ object JoinIndexRule
305305
val rRequiredIndexedCols = lRMap.values.toSeq
306306

307307
// All required columns resolved with base relation.
308-
val lRequiredAllCols = resolve(spark, allRequiredCols(left), leftRelation.plan).get
309-
val rRequiredAllCols = resolve(spark, allRequiredCols(right), rightRelation.plan).get
308+
val lRequiredAllCols = resolve(spark, allRequiredCols(left), leftRelation.plan).get.map(_._1)
309+
val rRequiredAllCols = resolve(spark, allRequiredCols(right), rightRelation.plan).get.map(_._1)
310310

311311
// Make sure required indexed columns are subset of all required columns for a subplan
312312
require(resolve(spark, lRequiredIndexedCols, lRequiredAllCols).isDefined)

src/main/scala/com/microsoft/hyperspace/util/ResolverUtils.scala

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -84,12 +84,13 @@ object ResolverUtils {
8484
* @param spark Spark session.
8585
* @param requiredStrings List of strings to resolve.
8686
* @param plan Logical plan to resolve against.
87-
* @return Optional Seq of resolved strings if all required strings are resolved. Else, None.
87+
* @return Optional sequence of tuples of resolved name string and nested state boolean
88+
* if all required strings are resolved. Else, None.
8889
*/
8990
def resolve(
9091
spark: SparkSession,
9192
requiredStrings: Seq[String],
92-
plan: LogicalPlan): Option[Seq[String]] = {
93+
plan: LogicalPlan): Option[Seq[(String, Boolean)]] = {
9394
val schema = plan.schema
9495
val resolver = spark.sessionState.conf.resolver
9596
val resolved = requiredStrings.map { requiredField =>
@@ -98,7 +99,12 @@ object ResolverUtils {
9899
.map { expr =>
99100
val resolvedColNameParts = extractColumnName(expr)
100101
validateResolvedColumnName(requiredField, resolvedColNameParts)
101-
getColumnNameFromSchema(schema, resolvedColNameParts, resolver).mkString(".")
102+
getColumnNameFromSchema(schema, resolvedColNameParts, resolver)
103+
.foldLeft(("", false)) { (acc, i) =>
104+
val name = Seq(acc._1, i._1).filter(_.nonEmpty).mkString(".")
105+
val isNested = acc._2 || i._2
106+
(name, isNested)
107+
}
102108
}
103109
.getOrElse { return None }
104110
}
@@ -110,8 +116,8 @@ object ResolverUtils {
110116
expr match {
111117
case a: Attribute =>
112118
Seq(a.name)
113-
case g @ GetStructField(_, _, Some(name)) =>
114-
extractColumnName(g.child) :+ name
119+
case _ @ GetStructField(child, _, Some(name)) =>
120+
extractColumnName(child) :+ name
115121
case _: GetArrayStructFields =>
116122
// TODO: Nested arrays will be supported later
117123
throw HyperspaceException("Array types are not supported.")
@@ -138,19 +144,19 @@ object ResolverUtils {
138144
private def getColumnNameFromSchema(
139145
schema: StructType,
140146
resolvedColNameParts: Seq[String],
141-
resolver: Resolver): Seq[String] = resolvedColNameParts match {
147+
resolver: Resolver): Seq[(String, Boolean)] = resolvedColNameParts match {
142148
case h :: tail =>
143149
val field = schema.find(f => resolver(f.name, h)).get
144150
field match {
145151
case StructField(name, s: StructType, _, _) =>
146-
name +: getColumnNameFromSchema(s, tail, resolver)
152+
(name, true) +: getColumnNameFromSchema(s, tail, resolver)
147153
case StructField(_, _: ArrayType, _, _) =>
148154
// TODO: Nested arrays will be supported later
149155
throw HyperspaceException("Array types are not supported.")
150156
case StructField(_, _: MapType, _, _) =>
151157
// TODO: Nested maps will be supported later
152158
throw HyperspaceException("Map types are not supported")
153-
case f => Seq(f.name)
159+
case f => Seq((f.name, false))
154160
}
155161
}
156162
}

src/main/scala/com/microsoft/hyperspace/util/SchemaUtils.scala

Lines changed: 26 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,9 @@ object SchemaUtils {
2222
val NESTED_FIELD_PREFIX_REGEX = "^__hs_nested\\."
2323

2424
/**
25-
* The method prefixes a nested field name. The field name must be
26-
* nested (it should contain a `.`).
25+
* The method prefixes a nested field name that hasn't already been prefixed.
26+
* The field name must be nested (it should contain a `.` and its type
27+
* should be of [[org.apache.spark.sql.types.StructType]]).
2728
*
2829
* The inverse operation is [[removePrefixNestedFieldName]].
2930
*
@@ -39,16 +40,23 @@ object SchemaUtils {
3940
}
4041

4142
/**
42-
* The method prefixes the nested field names from a collection. The field names
43-
* that are not nested will not be changed.
43+
* The method prefixes the nested field names from a map where the keys are
44+
* the field names and the values are the nested state of that field
45+
* which should be the result of [[ResolverUtils.resolve]].
46+
* The field names that are not marked as nested will not be changed.
4447
*
45-
* See [[prefixNestedFieldName]] method.
48+
* See [[prefixNestedFieldName]] and [[ResolverUtils.resolve]] methods.
4649
*
47-
* @param fieldNames The collection of field names to prefix.
50+
* @param fieldNames A sequence of tuples of field names and nested status.
4851
* @return A collection with prefixed nested fields.
4952
*/
50-
def prefixNestedFieldNames(fieldNames: Seq[String]): Seq[String] = {
51-
fieldNames.map(prefixNestedFieldName)
53+
def prefixNestedFieldNames(fieldNames: Seq[(String, Boolean)]): Seq[String] = {
54+
fieldNames.map {
55+
case (fieldName, true) =>
56+
prefixNestedFieldName(fieldName)
57+
case (fieldName, false) =>
58+
fieldName
59+
}
5260
}
5361

5462
/**
@@ -66,14 +74,21 @@ object SchemaUtils {
6674

6775
/**
6876
* The method removes the prefix from a collection of prefixed nested field names.
77+
* It returns the original sequence of tuples of field names and nested state.
6978
*
7079
* The inverse operation is [[prefixNestedFieldNames]].
7180
*
7281
* @param fieldNames The collection of prefixed field names.
73-
* @return A collection with original nested field names.
82+
* @return A sequence of tuples of field names and nested status.
7483
*/
75-
def removePrefixNestedFieldNames(fieldNames: Seq[String]): Seq[String] = {
76-
fieldNames.map(removePrefixNestedFieldName)
84+
def removePrefixNestedFieldNames(fieldNames: Seq[String]): Seq[(String, Boolean)] = {
85+
fieldNames.map { fieldName =>
86+
if (SchemaUtils.isFieldNamePrefixed(fieldName)) {
87+
removePrefixNestedFieldName(fieldName) -> true
88+
} else {
89+
fieldName -> false
90+
}
91+
}
7792
}
7893

7994
/**

src/test/scala/com/microsoft/hyperspace/index/CreateIndexNestedTest.scala

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package com.microsoft.hyperspace.index
1818

19+
import scala.collection.immutable.ListMap
1920
import scala.collection.mutable.WrappedArray
2021

2122
import org.apache.hadoop.conf.Configuration
@@ -159,8 +160,9 @@ class CreateIndexNestedTest extends HyperspaceSuite with SQLHelper {
159160
// should be added to index schema if they are not already among index config columns.
160161
assert(
161162
indexRecordsDF.schema.fieldNames.sorted ===
162-
(SchemaUtils.prefixNestedFieldNames(indexConfig2.indexedColumns ++
163-
indexConfig2.includedColumns) ++
163+
(SchemaUtils.prefixNestedFieldNames(
164+
indexConfig2.indexedColumns.zip(Seq(true)) ++
165+
indexConfig2.includedColumns.zip(Seq(true))) ++
164166
Seq(IndexConstants.DATA_FILE_NAME_ID) ++ partitionKeys).sorted)
165167
}
166168
}
@@ -174,8 +176,9 @@ class CreateIndexNestedTest extends HyperspaceSuite with SQLHelper {
174176
// For non-partitioned data, only file name lineage column should be added to index schema.
175177
assert(
176178
indexRecordsDF.schema.fieldNames.sorted ===
177-
(SchemaUtils.prefixNestedFieldNames(indexConfig1.indexedColumns ++
178-
indexConfig1.includedColumns) ++
179+
(SchemaUtils.prefixNestedFieldNames(
180+
indexConfig1.indexedColumns.zip(Seq(true)) ++
181+
indexConfig1.includedColumns.zip(Seq(false, true))) ++
179182
Seq(IndexConstants.DATA_FILE_NAME_ID)).sorted)
180183
}
181184
}

0 commit comments

Comments
 (0)