Skip to content
This repository was archived by the owner on Jun 14, 2024. It is now read-only.

Commit b35b8a6

Browse files
Resolve to return the nested state along with field name
1 parent 4f8e392 commit b35b8a6

File tree

10 files changed

+137
-95
lines changed

10 files changed

+137
-95
lines changed

src/main/scala/com/microsoft/hyperspace/actions/CreateActionBase.scala

Lines changed: 41 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,10 @@
1616

1717
package com.microsoft.hyperspace.actions
1818

19+
import scala.collection.immutable.ListMap
20+
1921
import org.apache.hadoop.fs.Path
20-
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
22+
import org.apache.spark.sql.{Column, DataFrame, SaveMode, SparkSession}
2123
import org.apache.spark.sql.catalyst.plans.logical.LeafNode
2224
import org.apache.spark.sql.functions.{col, input_file_name}
2325

@@ -104,23 +106,6 @@ private[actions] abstract class CreateActionBase(dataManager: IndexDataManager)
104106
}
105107
}
106108

107-
private def hasParquetAsSourceFormatProperty(
108-
relation: FileBasedRelation): Option[(String, String)] = {
109-
if (relation.hasParquetAsSourceFormat) {
110-
Some(IndexConstants.HAS_PARQUET_AS_SOURCE_FORMAT_PROPERTY -> "true")
111-
} else {
112-
None
113-
}
114-
}
115-
116-
private def hasLineageProperty(spark: SparkSession): Option[(String, String)] = {
117-
if (hasLineage(spark)) {
118-
Some(IndexConstants.LINEAGE_PROPERTY -> "true")
119-
} else {
120-
None
121-
}
122-
}
123-
124109
protected def write(spark: SparkSession, df: DataFrame, indexConfig: IndexConfig): Unit = {
125110
val numBuckets = numBucketsForIndex(spark)
126111

@@ -156,9 +141,26 @@ private[actions] abstract class CreateActionBase(dataManager: IndexDataManager)
156141
relations.head
157142
}
158143

144+
private def hasParquetAsSourceFormatProperty(
145+
relation: FileBasedRelation): Option[(String, String)] = {
146+
if (relation.hasParquetAsSourceFormat) {
147+
Some(IndexConstants.HAS_PARQUET_AS_SOURCE_FORMAT_PROPERTY -> "true")
148+
} else {
149+
None
150+
}
151+
}
152+
153+
private def hasLineageProperty(spark: SparkSession): Option[(String, String)] = {
154+
if (hasLineage(spark)) {
155+
Some(IndexConstants.LINEAGE_PROPERTY -> "true")
156+
} else {
157+
None
158+
}
159+
}
160+
159161
private def resolveConfig(
160162
df: DataFrame,
161-
indexConfig: IndexConfig): (Seq[String], Seq[String]) = {
163+
indexConfig: IndexConfig): (ListMap[String, Boolean], ListMap[String, Boolean]) = {
162164
val spark = df.sparkSession
163165
val plan = df.queryExecution.analyzed
164166
val indexedColumns = indexConfig.indexedColumns
@@ -170,7 +172,7 @@ private[actions] abstract class CreateActionBase(dataManager: IndexDataManager)
170172
case (Some(indexed), Some(included)) => (indexed, included)
171173
case _ =>
172174
val unresolvedColumns = (indexedColumns ++ includedColumns)
173-
.map(c => (c, ResolverUtils.resolve(spark, Seq(c), plan)))
175+
.map(c => (c, ResolverUtils.resolve(spark, Seq(c), plan).map(_.keys)))
174176
.collect { case c if c._2.isEmpty => c._1 }
175177
throw HyperspaceException(
176178
s"Columns '${unresolvedColumns.mkString(",")}' could not be resolved " +
@@ -183,10 +185,12 @@ private[actions] abstract class CreateActionBase(dataManager: IndexDataManager)
183185
df: DataFrame,
184186
indexConfig: IndexConfig): (DataFrame, Seq[String], Seq[String]) = {
185187
val (resolvedIndexedColumns, resolvedIncludedColumns) = resolveConfig(df, indexConfig)
186-
val columnsFromIndexConfig = resolvedIndexedColumns ++ resolvedIncludedColumns
188+
val columnsFromIndexConfig =
189+
resolvedIndexedColumns.keys.toSeq ++ resolvedIncludedColumns.keys.toSeq
187190

188-
val escapedIndexedColumns = SchemaUtils.prefixNestedFieldNames(resolvedIndexedColumns)
189-
val escapedIncludedColumns = SchemaUtils.prefixNestedFieldNames(resolvedIncludedColumns)
191+
val prefixedIndexedColumns = SchemaUtils.prefixNestedFieldNames(resolvedIndexedColumns)
192+
val prefixedIncludedColumns = SchemaUtils.prefixNestedFieldNames(resolvedIncludedColumns)
193+
val prefixedColumnsFromIndexConfig = prefixedIndexedColumns ++ prefixedIncludedColumns
190194

191195
val indexDF = if (hasLineage(spark)) {
192196
val relation = getRelation(spark, df)
@@ -215,19 +219,25 @@ private[actions] abstract class CreateActionBase(dataManager: IndexDataManager)
215219
val dataPathColumn = "_data_path"
216220
val lineagePairs = relation.lineagePairs(fileIdTracker)
217221
val lineageDF = lineagePairs.toDF(dataPathColumn, IndexConstants.DATA_FILE_NAME_ID)
222+
val prefixedAllIndexColumns = prefixedColumnsFromIndexConfig ++ missingPartitionColumns
218223

219224
df.withColumn(dataPathColumn, input_file_name())
220225
.join(lineageDF.hint("broadcast"), dataPathColumn)
221-
.select(
222-
allIndexColumns.head,
223-
allIndexColumns.tail :+ IndexConstants.DATA_FILE_NAME_ID: _*)
224-
.toDF(escapedIndexedColumns ++ escapedIncludedColumns ++ missingPartitionColumns :+
225-
IndexConstants.DATA_FILE_NAME_ID: _*)
226+
.select(prepareColumns(allIndexColumns, prefixedAllIndexColumns) :+
227+
col(IndexConstants.DATA_FILE_NAME_ID): _*)
226228
} else {
227-
df.select(columnsFromIndexConfig.head, columnsFromIndexConfig.tail: _*)
228-
.toDF(escapedIndexedColumns ++ escapedIncludedColumns : _*)
229+
df.select(prepareColumns(columnsFromIndexConfig, prefixedColumnsFromIndexConfig): _*)
229230
}
230231

231-
(indexDF, escapedIndexedColumns, escapedIncludedColumns)
232+
(indexDF, prefixedIndexedColumns, prefixedIncludedColumns)
233+
}
234+
235+
private def prepareColumns(
236+
originalColumns: Seq[String],
237+
prefixedColumns: Seq[String]): Seq[Column] = {
238+
originalColumns.zip(prefixedColumns).map {
239+
case (original, prefixed) =>
240+
col(original).as(prefixed)
241+
}
232242
}
233243
}

src/main/scala/com/microsoft/hyperspace/actions/RefreshActionBase.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -93,8 +93,8 @@ private[actions] abstract class RefreshActionBase(
9393
val ddColumns = previousIndexLogEntry.derivedDataset.properties.columns
9494
IndexConfig(
9595
previousIndexLogEntry.name,
96-
SchemaUtils.removePrefixNestedFieldNames(ddColumns.indexed),
97-
SchemaUtils.removePrefixNestedFieldNames(ddColumns.included))
96+
SchemaUtils.removePrefixNestedFieldNames(ddColumns.indexed).keys.toSeq,
97+
SchemaUtils.removePrefixNestedFieldNames(ddColumns.included).keys.toSeq)
9898
}
9999

100100
final override val transientState: String = REFRESHING

src/main/scala/com/microsoft/hyperspace/actions/RefreshIncrementalAction.scala

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ import com.microsoft.hyperspace.{Hyperspace, HyperspaceException}
2323
import com.microsoft.hyperspace.index._
2424
import com.microsoft.hyperspace.index.DataFrameWriterExtensions.Bucketizer
2525
import com.microsoft.hyperspace.telemetry.{AppInfo, HyperspaceEvent, RefreshIncrementalActionEvent}
26-
import com.microsoft.hyperspace.util.SchemaUtils
2726

2827
/**
2928
* Action to refresh indexes with newly appended files and deleted files in an incremental way.
@@ -92,7 +91,8 @@ class RefreshIncrementalAction(
9291
refreshDF,
9392
indexDataPath.toString,
9493
previousIndexLogEntry.numBuckets,
95-
SchemaUtils.prefixNestedFieldNames(indexConfig.indexedColumns),
94+
// should contain the resolved and prefixed field names
95+
previousIndexLogEntry.derivedDataset.properties.columns.indexed,
9696
writeMode)
9797
}
9898
}
@@ -116,10 +116,6 @@ class RefreshIncrementalAction(
116116
}
117117
}
118118

119-
override protected def event(appInfo: AppInfo, message: String): HyperspaceEvent = {
120-
RefreshIncrementalActionEvent(appInfo, logEntry.asInstanceOf[IndexLogEntry], message)
121-
}
122-
123119
/**
124120
* Create a log entry with all source data files, and all required index content. This contains
125121
* ALL source data files (files which were indexed previously, and files which are being indexed
@@ -144,4 +140,8 @@ class RefreshIncrementalAction(
144140
entry
145141
}
146142
}
143+
144+
override protected def event(appInfo: AppInfo, message: String): HyperspaceEvent = {
145+
RefreshIncrementalActionEvent(appInfo, logEntry.asInstanceOf[IndexLogEntry], message)
146+
}
147147
}

src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -557,10 +557,6 @@ case class IndexLogEntry(
557557
config.hashCode + signature.hashCode + numBuckets.hashCode + content.hashCode
558558
}
559559

560-
def usesNestedFields: Boolean = {
561-
SchemaUtils.containsNestedFieldNames(indexedColumns ++ includedColumns)
562-
}
563-
564560
/**
565561
* A mutable map for holding auxiliary information of this index log entry while applying rules.
566562
*/

src/main/scala/com/microsoft/hyperspace/index/rules/JoinIndexRule.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -305,8 +305,8 @@ object JoinIndexRule
305305
val rRequiredIndexedCols = lRMap.values.toSeq
306306

307307
// All required columns resolved with base relation.
308-
val lRequiredAllCols = resolve(spark, allRequiredCols(left), leftRelation.plan).get
309-
val rRequiredAllCols = resolve(spark, allRequiredCols(right), rightRelation.plan).get
308+
val lRequiredAllCols = resolve(spark, allRequiredCols(left), leftRelation.plan).get.keys.toSeq
309+
val rRequiredAllCols = resolve(spark, allRequiredCols(right), rightRelation.plan).get.keys.toSeq
310310

311311
// Make sure required indexed columns are subset of all required columns for a subplan
312312
require(resolve(spark, lRequiredIndexedCols, lRequiredAllCols).isDefined)

src/main/scala/com/microsoft/hyperspace/util/ResolverUtils.scala

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616

1717
package com.microsoft.hyperspace.util
1818

19+
import scala.collection.immutable.ListMap
20+
1921
import org.apache.spark.sql.SparkSession
2022
import org.apache.spark.sql.catalyst.analysis.Resolver
2123
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, Expression, ExtractValue, GetArrayStructFields, GetMapValue, GetStructField}
@@ -84,12 +86,13 @@ object ResolverUtils {
8486
* @param spark Spark session.
8587
* @param requiredStrings List of strings to resolve.
8688
* @param plan Logical plan to resolve against.
87-
* @return Optional Seq of resolved strings if all required strings are resolved. Else, None.
89+
* @return Optional ListMap of tuples of resolved name string and nested state boolean
90+
* if all required strings are resolved. Else, None.
8891
*/
8992
def resolve(
9093
spark: SparkSession,
9194
requiredStrings: Seq[String],
92-
plan: LogicalPlan): Option[Seq[String]] = {
95+
plan: LogicalPlan): Option[ListMap[String, Boolean]] = {
9396
val schema = plan.schema
9497
val resolver = spark.sessionState.conf.resolver
9598
val resolved = requiredStrings.map { requiredField =>
@@ -98,11 +101,16 @@ object ResolverUtils {
98101
.map { expr =>
99102
val resolvedColNameParts = extractColumnName(expr)
100103
validateResolvedColumnName(requiredField, resolvedColNameParts)
101-
getColumnNameFromSchema(schema, resolvedColNameParts, resolver).mkString(".")
104+
getColumnNameFromSchema(schema, resolvedColNameParts, resolver)
105+
.foldLeft(("", false)) { (acc, i) =>
106+
val name = Seq(acc._1, i._1).filter(_.nonEmpty).mkString(".")
107+
val isNested = acc._2 || i._2
108+
(name, isNested)
109+
}
102110
}
103111
.getOrElse { return None }
104112
}
105-
Some(resolved)
113+
Some(ListMap(resolved: _*))
106114
}
107115

108116
// Extracts the parts of a nested field access path from an expression.
@@ -138,19 +146,19 @@ object ResolverUtils {
138146
private def getColumnNameFromSchema(
139147
schema: StructType,
140148
resolvedColNameParts: Seq[String],
141-
resolver: Resolver): Seq[String] = resolvedColNameParts match {
149+
resolver: Resolver): Seq[(String, Boolean)] = resolvedColNameParts match {
142150
case h :: tail =>
143151
val field = schema.find(f => resolver(f.name, h)).get
144152
field match {
145153
case StructField(name, s: StructType, _, _) =>
146-
name +: getColumnNameFromSchema(s, tail, resolver)
154+
(name, true) +: getColumnNameFromSchema(s, tail, resolver)
147155
case StructField(_, _: ArrayType, _, _) =>
148156
// TODO: Nested arrays will be supported later
149157
throw HyperspaceException("Array types are not supported.")
150158
case StructField(_, _: MapType, _, _) =>
151159
// TODO: Nested maps will be supported later
152160
throw HyperspaceException("Map types are not supported")
153-
case f => Seq(f.name)
161+
case f => Seq((f.name, false))
154162
}
155163
}
156164
}

src/main/scala/com/microsoft/hyperspace/util/SchemaUtils.scala

Lines changed: 28 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,17 @@
1616

1717
package com.microsoft.hyperspace.util
1818

19+
import scala.collection.immutable.ListMap
20+
1921
object SchemaUtils {
2022

2123
val NESTED_FIELD_PREFIX = "__hs_nested."
2224
val NESTED_FIELD_PREFIX_REGEX = "^__hs_nested\\."
2325

2426
/**
25-
* The method prefixes a nested field name. The field name must be
26-
* nested (it should contain a `.`).
27+
* The method prefixes a nested field name that hasn't already been prefixed.
28+
* The field name must be nested (it should contain a `.` and its type
29+
* should be of [[org.apache.spark.sql.types.StructType]]).
2730
*
2831
* The inverse operation is [[removePrefixNestedFieldName]].
2932
*
@@ -39,16 +42,23 @@ object SchemaUtils {
3942
}
4043

4144
/**
42-
* The method prefixes the nested field names from a collection. The field names
43-
* that are not nested will not be changed.
45+
* The method prefixes the nested field names from a map where the keys are
46+
* the field names and the values are the nested state of that field
47+
* which should be the result of [[ResolverUtils.resolve]].
48+
* The field names that are not marked as nested will not be changed.
4449
*
45-
* See [[prefixNestedFieldName]] method.
50+
* See [[prefixNestedFieldName]] and [[ResolverUtils.resolve]] methods.
4651
*
47-
* @param fieldNames The collection of field names to prefix.
52+
* @param fieldNames A [[ListMap]] of field names and nested status.
4853
* @return A collection with prefixed nested fields.
4954
*/
50-
def prefixNestedFieldNames(fieldNames: Seq[String]): Seq[String] = {
51-
fieldNames.map(prefixNestedFieldName)
55+
def prefixNestedFieldNames(fieldNames: ListMap[String, Boolean]): Seq[String] = {
56+
fieldNames.map {
57+
case (fieldName, true) =>
58+
prefixNestedFieldName(fieldName)
59+
case (fieldName, false) =>
60+
fieldName
61+
}.toSeq
5262
}
5363

5464
/**
@@ -66,14 +76,21 @@ object SchemaUtils {
6676

6777
/**
6878
* The method removes the prefix from a collection of prefixed nested field names.
79+
* It returns the original ListMap of field names and nested state.
6980
*
7081
* The inverse operation is [[prefixNestedFieldNames]].
7182
*
7283
* @param fieldNames The collection of prefixed field names.
73-
* @return A collection with original nested field names.
84+
* @return A [[ListMap]] of field names and nested status.
7485
*/
75-
def removePrefixNestedFieldNames(fieldNames: Seq[String]): Seq[String] = {
76-
fieldNames.map(removePrefixNestedFieldName)
86+
def removePrefixNestedFieldNames(fieldNames: Seq[String]): Map[String, Boolean] = {
87+
fieldNames.map { fieldName =>
88+
if (SchemaUtils.isFieldNamePrefixed(fieldName)) {
89+
removePrefixNestedFieldName(fieldName) -> true
90+
} else {
91+
fieldName -> false
92+
}
93+
}. toMap
7794
}
7895

7996
/**

src/test/scala/com/microsoft/hyperspace/index/CreateIndexNestedTest.scala

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package com.microsoft.hyperspace.index
1818

19+
import scala.collection.immutable.ListMap
1920
import scala.collection.mutable.WrappedArray
2021

2122
import org.apache.hadoop.conf.Configuration
@@ -159,8 +160,9 @@ class CreateIndexNestedTest extends HyperspaceSuite with SQLHelper {
159160
// should be added to index schema if they are not already among index config columns.
160161
assert(
161162
indexRecordsDF.schema.fieldNames.sorted ===
162-
(SchemaUtils.prefixNestedFieldNames(indexConfig2.indexedColumns ++
163-
indexConfig2.includedColumns) ++
163+
(SchemaUtils.prefixNestedFieldNames(
164+
ListMap(indexConfig2.indexedColumns.zip(Seq(true)): _*) ++
165+
ListMap(indexConfig2.includedColumns.zip(Seq(true)): _*)) ++
164166
Seq(IndexConstants.DATA_FILE_NAME_ID) ++ partitionKeys).sorted)
165167
}
166168
}
@@ -174,8 +176,9 @@ class CreateIndexNestedTest extends HyperspaceSuite with SQLHelper {
174176
// For non-partitioned data, only file name lineage column should be added to index schema.
175177
assert(
176178
indexRecordsDF.schema.fieldNames.sorted ===
177-
(SchemaUtils.prefixNestedFieldNames(indexConfig1.indexedColumns ++
178-
indexConfig1.includedColumns) ++
179+
(SchemaUtils.prefixNestedFieldNames(
180+
ListMap(indexConfig1.indexedColumns.zip(Seq(true)): _*) ++
181+
ListMap(indexConfig1.includedColumns.zip(Seq(false, true)): _*)) ++
179182
Seq(IndexConstants.DATA_FILE_NAME_ID)).sorted)
180183
}
181184
}

0 commit comments

Comments
 (0)