diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/commands/alterDeltaTableCommands.scala b/spark/src/main/scala/org/apache/spark/sql/delta/commands/alterDeltaTableCommands.scala index 95c0810b55f..daf9cd592ee 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/commands/alterDeltaTableCommands.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/commands/alterDeltaTableCommands.scala @@ -78,7 +78,7 @@ trait AlterDeltaTableCommand extends DeltaCommand { protected def checkDependentExpressions( sparkSession: SparkSession, columnParts: Seq[String], - newMetadata: actions.Metadata, + oldMetadata: actions.Metadata, protocol: Protocol): Unit = { if (!sparkSession.sessionState.conf.getConf( DeltaSQLConf.DELTA_ALTER_TABLE_CHANGE_COLUMN_CHECK_EXPRESSIONS)) { @@ -86,14 +86,14 @@ trait AlterDeltaTableCommand extends DeltaCommand { } // check if the column to change is referenced by check constraints val dependentConstraints = - Constraints.findDependentConstraints(sparkSession, columnParts, newMetadata) + Constraints.findDependentConstraints(sparkSession, columnParts, oldMetadata) if (dependentConstraints.nonEmpty) { throw DeltaErrors.foundViolatingConstraintsForColumnChange( UnresolvedAttribute(columnParts).name, dependentConstraints) } // check if the column to change is referenced by any generated columns val dependentGenCols = SchemaUtils.findDependentGeneratedColumns( - sparkSession, columnParts, protocol, newMetadata.schema) + sparkSession, columnParts, protocol, oldMetadata.schema) if (dependentGenCols.nonEmpty) { throw DeltaErrors.foundViolatingGeneratedColumnsForColumnChange( UnresolvedAttribute(columnParts).name, dependentGenCols) @@ -768,7 +768,7 @@ case class AlterTableDropColumnsDeltaCommand( configuration = newConfiguration ) columnsToDrop.foreach { columnParts => - checkDependentExpressions(sparkSession, columnParts, newMetadata, txn.protocol) + checkDependentExpressions(sparkSession, columnParts, metadata, txn.protocol) } txn.updateMetadata(newMetadata) @@ -927,7 +927,7 @@ case class AlterTableChangeColumnDeltaCommand( if (newColumn.name != columnName) { // need to validate the changes if the column is renamed checkDependentExpressions( - sparkSession, columnPath :+ columnName, newMetadata, txn.protocol) + sparkSession, columnPath :+ columnName, metadata, txn.protocol) } diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/constraints/Constraints.scala b/spark/src/main/scala/org/apache/spark/sql/delta/constraints/Constraints.scala index 11920e2220c..da3f2fbb786 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/constraints/Constraints.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/constraints/Constraints.scala @@ -106,7 +106,11 @@ object Constraints { metadata.configuration.filter { case (key, constraint) if key.toLowerCase(Locale.ROOT).startsWith("delta.constraints.") => SchemaUtils.containsDependentExpression( - sparkSession, columnName, constraint, sparkSession.sessionState.conf.resolver) + sparkSession, + columnName, + constraint, + metadata.schema, + sparkSession.sessionState.conf.resolver) case _ => false } } diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/schema/ImplicitMetadataOperation.scala b/spark/src/main/scala/org/apache/spark/sql/delta/schema/ImplicitMetadataOperation.scala index 015dcf5033c..cbaf4fdee0e 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/schema/ImplicitMetadataOperation.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/schema/ImplicitMetadataOperation.scala @@ -29,7 +29,7 @@ import org.apache.spark.internal.MDC import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.expressions.FileSourceGeneratedMetadataStructField import org.apache.spark.sql.catalyst.types.DataTypeUtils.toAttributes -import org.apache.spark.sql.types.{DataType, StructType} +import org.apache.spark.sql.types.{ArrayType, DataType, MapType, StructType} /** * A trait that writers into Delta can extend to update the schema and/or partitioning of the table. @@ -309,19 +309,19 @@ object ImplicitMetadataOperation { currentDt: DataType, updateDt: DataType): Unit = (currentDt, updateDt) match { // we explicitly ignore the check for `StructType` here. - case (StructType(_), StructType(_)) => - - // FIXME: we intentionally incorporate the pattern match for `ArrayType` and `MapType` - // here mainly due to the field paths for maps/arrays in constraints/generated columns - // are *NOT* consistent with regular field paths, - // e.g., `hash(a.arr[0].x)` vs. `hash(a.element.x)`. - // this makes it hard to recurse into maps/arrays and check for the corresponding - // fields - thus we can not actually block the operation even if the updated field - // is being referenced by any CHECK constraints or generated columns. - case (from, to) => + case (_: StructType, _: StructType) => + case (current: ArrayType, update: ArrayType) => + checkConstraintsOrGeneratedColumnsOnStructField( + spark, path :+ "element", protocol, metadata, current.elementType, update.elementType) + case (current: MapType, update: MapType) => + checkConstraintsOrGeneratedColumnsOnStructField( + spark, path :+ "key", protocol, metadata, current.keyType, update.keyType) + checkConstraintsOrGeneratedColumnsOnStructField( + spark, path :+ "value", protocol, metadata, current.valueType, update.valueType) + case (_, _) => if (currentDt != updateDt) { - checkDependentConstraints(spark, path, metadata, from, to) - checkDependentGeneratedColumns(spark, path, protocol, metadata, from, to) + checkDependentConstraints(spark, path, metadata, currentDt, updateDt) + checkDependentGeneratedColumns(spark, path, protocol, metadata, currentDt, updateDt) } } diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/schema/SchemaUtils.scala b/spark/src/main/scala/org/apache/spark/sql/delta/schema/SchemaUtils.scala index c3c724b4354..2b9b619eb20 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/schema/SchemaUtils.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/schema/SchemaUtils.scala @@ -35,7 +35,8 @@ import org.apache.spark.internal.MDC import org.apache.spark.sql._ import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.analysis.{Resolver, UnresolvedAttribute} -import org.apache.spark.sql.catalyst.expressions.AttributeReference +import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, Expression, GetArrayItem, GetArrayStructFields, GetMapValue, GetStructField} +import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, Project} import org.apache.spark.sql.catalyst.util.CharVarcharUtils import org.apache.spark.sql.functions.{col, struct} import org.apache.spark.sql.internal.SQLConf @@ -1269,20 +1270,58 @@ def normalizeColumnNamesInDataType( // identifier with back-ticks. def quoteIdentifier(part: String): String = s"`${part.replace("`", "``")}`" + private def analyzeExpression( + spark: SparkSession, + expr: Expression, + schema: StructType): Expression = { + // Workaround for `exp` analyze + val relation = LocalRelation(schema) + val relationWithExp = Project(Seq(Alias(expr, "validate_column")()), relation) + val analyzedPlan = spark.sessionState.analyzer.execute(relationWithExp) + analyzedPlan.collectFirst { + case Project(Seq(a: Alias), _: LocalRelation) => a.child + }.get + } + /** - * Will a column change, e.g., rename, need to be populated to the expression. This is true when - * the column to change itself or any of its descendent column is referenced by expression. + * Collects all attribute references in the given expression tree as a list of paths. + * In particular, generates paths for nested fields accessed using extraction expressions. * For example: - * - a, length(a) -> true - * - b, (b.c + 1) -> true, because renaming b1 will need to change the expr to (b1.c + 1). - * - b.c, (cast b as string) -> false, because you can change b.c to b.c1 without affecting b. + * - GetStructField(AttributeReference("struct"), "a") -> ["struct.a"] + * - Size(AttributeReference("array")) -> ["array"] */ - def containsDependentExpression( - spark: SparkSession, + private def collectUsedColumns(expression: Expression): Seq[Seq[String]] = { + val result = new collection.mutable.ArrayBuffer[Seq[String]]() + + // Firstly, try to get referenced column for a child's expression. + // If it exists then we try to extend it by current expression. + // In case if we cannot extend one, we save the received column path (it's as long as possible). + def traverseAllPaths(exp: Expression): Option[Seq[String]] = exp match { + case GetStructField(child, _, Some(name)) => traverseAllPaths(child).map(_ :+ name) + case GetMapValue(child, key) => + traverseAllPaths(key).foreach(result += _) + traverseAllPaths(child).map { childPath => + result += childPath :+ "key" + childPath :+ "value" + } + case arrayExtract: GetArrayItem => traverseAllPaths(arrayExtract.child).map(_ :+ "element") + case arrayExtract: GetArrayStructFields => + traverseAllPaths(arrayExtract.child).map(_ :+ "element" :+ arrayExtract.field.name) + case refCol: AttributeReference => Some(Seq(refCol.name)) + case _ => + exp.children.foreach(child => traverseAllPaths(child).foreach(result += _)) + None + } + + traverseAllPaths(expression).foreach(result += _) + + result.toSeq + } + + private def fallbackContainsDependentExpression( + expression: Expression, columnToChange: Seq[String], - exprString: String, resolver: Resolver): Boolean = { - val expression = spark.sessionState.sqlParser.parseExpression(exprString) expression.foreach { case refCol: UnresolvedAttribute => // columnToChange is the referenced column or its prefix @@ -1294,6 +1333,51 @@ def normalizeColumnNamesInDataType( false } + /** + * Will a column change, e.g., rename, need to be populated to the expression. This is true when + * the column to change itself or any of its descendent column is referenced by expression. + * For example: + * - a, length(a) -> true + * - b, (b.c + 1) -> true, because renaming b1 will need to change the expr to (b1.c + 1). + * - b.c, (cast b as string) -> true, because change b.c to b.c1 affects (b as string) result. + */ + def containsDependentExpression( + spark: SparkSession, + columnToChange: Seq[String], + exprString: String, + schema: StructType, + resolver: Resolver): Boolean = { + val expression = spark.sessionState.sqlParser.parseExpression(exprString) + if (spark.sessionState.conf.getConf( + DeltaSQLConf.DELTA_CHANGE_COLUMN_CHECK_DEPENDENT_EXPRESSIONS_USE_V2)) { + try { + val analyzedExpr = analyzeExpression(spark, expression, schema) + val exprColumns = collectUsedColumns(analyzedExpr) + exprColumns.exists { exprColumn => + // Changed column violates expression's column only when: + // 1) the changed column is a prefix of the referenced column, + // for example changing type of `col` affects `hash(col[0]) == 0`; + // 2) or the referenced column is a prefix of the changed column, + // for example changing type of `col.element` affects `concat_ws('', col) == 'abc'`; + // 3) or they are equal. + exprColumn.zip(columnToChange).forall { + case (exprFieldName, changedFieldName) => resolver(exprFieldName, changedFieldName) + } + } + } catch { + case NonFatal(e) => + deltaAssert( + check = false, + name = "containsDependentExpression.checkV2Error", + msg = "Exception during dependent expression V2 checking: " + e.getMessage + ) + fallbackContainsDependentExpression(expression, columnToChange, resolver) + } + } else { + fallbackContainsDependentExpression(expression, columnToChange, resolver) + } + } + /** * Find the unsupported data type in a table schema. Return all columns that are using unsupported * data types. For example, @@ -1402,7 +1486,7 @@ def normalizeColumnNamesInDataType( SchemaMergingUtils.transformColumns(schema) { (_, field, _) => GeneratedColumn.getGenerationExpressionStr(field.metadata).foreach { exprStr => val needsToChangeExpr = SchemaUtils.containsDependentExpression( - sparkSession, targetColumn, exprStr, sparkSession.sessionState.conf.resolver) + sparkSession, targetColumn, exprStr, schema, sparkSession.sessionState.conf.resolver) if (needsToChangeExpr) dependentGenCols += field.name -> exprStr } field diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala index fc99ea8e2b4..9b288ced4e0 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala @@ -1580,6 +1580,21 @@ trait DeltaSQLConfBase { .booleanConf .createWithDefault(true) + val DELTA_CHANGE_COLUMN_CHECK_DEPENDENT_EXPRESSIONS_USE_V2 = + buildConf("changeColumn.checkDependentExpressionsUseV2") + .internal() + .doc( + """ + |More accurate implementation of checker for altering/renaming/dropping columns + |that might be referenced by constraints or generation rules. + |It respects nested arrays and maps, unlike the V1 checker. + | + |This is a safety switch - we should only turn this off when there is an issue with + |expression checking logic that prevents a valid column change from going through. + |""".stripMargin) + .booleanConf + .createWithDefault(true) + val DELTA_ALTER_TABLE_DROP_COLUMN_ENABLED = buildConf("alterTable.dropColumn.enabled") .internal() diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/schema/SchemaUtilsSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/schema/SchemaUtilsSuite.scala index 7ec8a7ea5f7..8126460fcca 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/schema/SchemaUtilsSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/schema/SchemaUtilsSuite.scala @@ -36,7 +36,7 @@ import org.scalatest.GivenWhenThen import org.apache.spark.sql.{AnalysisException, Column, DataFrame, QueryTest, Row} import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} -import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute +import org.apache.spark.sql.catalyst.analysis.{caseInsensitiveResolution, caseSensitiveResolution, UnresolvedAttribute} import org.apache.spark.sql.catalyst.expressions.{Alias, Cast, Expression} import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project} import org.apache.spark.sql.functions._ @@ -3048,6 +3048,100 @@ class SchemaUtilsSuite extends QueryTest assert(udts.map(_.getClass.getName).toSet == Set(classOf[PointUDT].getName)) } + + test("check if column affects given dependent expressions") { + val schema = StructType(Seq( + StructField("cArray", ArrayType(StringType)), + StructField("cStruct", StructType(Seq( + StructField("cMap", MapType(IntegerType, ArrayType(BooleanType))), + StructField("cMapWithComplexKey", MapType(StructType(Seq( + StructField("a", ArrayType(StringType)), + StructField("b", BooleanType) + )), IntegerType)) + ))) + )) + assert( + SchemaUtils.containsDependentExpression( + spark, + columnToChange = Seq("cArray"), + exprString = "cast(cStruct.cMap as string) == '{}'", + schema, + caseInsensitiveResolution) === false + ) + // Extracting value from map uses key type as well. + assert( + SchemaUtils.containsDependentExpression( + spark, + columnToChange = Seq("cStruct", "cMap", "key"), + exprString = "cStruct.cMap['random_key'] == 'string'", + schema, + caseInsensitiveResolution) === true + ) + assert( + SchemaUtils.containsDependentExpression( + spark, + columnToChange = Seq("cstruct"), + exprString = "size(cStruct.cMap) == 0", + schema, + caseSensitiveResolution) === false + ) + assert( + SchemaUtils.containsDependentExpression( + spark, + columnToChange = Seq("cStruct", "cMap", "key"), + exprString = "size(cArray) == 1", + schema, + caseInsensitiveResolution) === false + ) + assert( + SchemaUtils.containsDependentExpression( + spark, + columnToChange = Seq("cStruct", "cMap", "key"), + exprString = "cStruct.cMapWithComplexKey[struct(cArray, false)] == 0", + schema, + caseInsensitiveResolution) === false + ) + assert( + SchemaUtils.containsDependentExpression( + spark, + columnToChange = Seq("cArray", "element"), + exprString = "cStruct.cMapWithComplexKey[struct(cArray, false)] == 0", + schema, + caseInsensitiveResolution) === true + ) + assert( + SchemaUtils.containsDependentExpression( + spark, + columnToChange = Seq("cStruct", "cMapWithComplexKey", "key", "b"), + exprString = "cStruct.cMapWithComplexKey[struct(cArray, false)] == 0", + schema, + caseInsensitiveResolution) === true + ) + assert( + SchemaUtils.containsDependentExpression( + spark, + columnToChange = Seq("cArray", "element"), + exprString = "concat_ws('', cArray) == 'string'", + schema, + caseInsensitiveResolution) === true + ) + assert( + SchemaUtils.containsDependentExpression( + spark, + columnToChange = Seq("CARRAY"), + exprString = "cArray[0] > 'a'", + schema, + caseInsensitiveResolution) === true + ) + assert( + SchemaUtils.containsDependentExpression( + spark, + columnToChange = Seq("CARRAY", "element"), + exprString = "cArray[0] > 'a'", + schema, + caseSensitiveResolution) === false + ) + } } object UnsupportedDataType extends DataType { diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/typewidening/TypeWideningConstraintsSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/typewidening/TypeWideningConstraintsSuite.scala index 17e75bbb2d8..b7236c54710 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/typewidening/TypeWideningConstraintsSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/typewidening/TypeWideningConstraintsSuite.scala @@ -20,6 +20,7 @@ import org.apache.spark.sql.delta.DeltaAnalysisException import org.apache.spark.sql.delta.sources.DeltaSQLConf import org.apache.spark.sql.{QueryTest, Row} +import org.apache.spark.sql.functions.col import org.apache.spark.sql.types._ /** @@ -96,6 +97,39 @@ trait TypeWideningConstraintsTests { self: QueryTest with TypeWideningTestMixin } } + test("check constraint on arrays and maps with type change") { + withTable("t") { + sql("CREATE TABLE t (m map, a array) USING DELTA") + sql("INSERT INTO t VALUES (map(1, 2, 7, -3), array(1, -2, 3))") + + sql("ALTER TABLE t CHANGE COLUMN a.element TYPE SMALLINT") + sql("ALTER TABLE t ADD CONSTRAINT ch1 CHECK (hash(a[1]) = -1160545675)") + checkError( + intercept[DeltaAnalysisException] { + sql("ALTER TABLE t CHANGE COLUMN a.element TYPE INTEGER") + }, + "DELTA_CONSTRAINT_DEPENDENT_COLUMN_CHANGE", + parameters = Map( + "columnName" -> "a.element", + "constraints" -> "delta.constraints.ch1 -> hash ( a [ 1 ] ) = - 1160545675" + ) + ) + + sql("ALTER TABLE t CHANGE COLUMN m.value TYPE SMALLINT") + sql("ALTER TABLE t ADD CONSTRAINT ch2 CHECK (sign(m[7]) < 0)") + checkError( + intercept[DeltaAnalysisException] { + sql("ALTER TABLE t CHANGE COLUMN m.value TYPE INTEGER") + }, + "DELTA_CONSTRAINT_DEPENDENT_COLUMN_CHANGE", + parameters = Map( + "columnName" -> "m.value", + "constraints" -> "delta.constraints.ch2 -> sign ( m [ 7 ] ) < 0" + ) + ) + } + } + test(s"check constraint with type evolution") { withTable("t") { sql(s"CREATE TABLE t (a byte) USING DELTA") @@ -189,6 +223,49 @@ trait TypeWideningConstraintsTests { self: QueryTest with TypeWideningTestMixin } } + test("check constraint on arrays and maps with type evolution") { + withTable("t") { + sql("CREATE TABLE t (s struct>>) USING DELTA") + sql("ALTER TABLE t ADD CONSTRAINT ck CHECK (s.arr[0][3] = 3)") + sql("INSERT INTO t(s) VALUES (struct(struct(array(map(1, 1, 3, 3)))))") + checkAnswer(sql("SELECT s.arr[0][3] FROM t"), Row(3)) + + withSQLConf(DeltaSQLConf.DELTA_SCHEMA_AUTO_MIGRATE.key -> "true") { + // Insert by name is not supported by type evolution. + checkError( + intercept[DeltaAnalysisException] { + // Migrate map's key to int type. + spark.createDataFrame(Seq(Tuple1(Tuple1(Array(Map(999999 -> 1, 3 -> 3)))))) + .toDF("s").withColumn("s", col("s").cast("struct>>")) + .write.format("delta").mode("append").saveAsTable("t") + }, + "DELTA_CONSTRAINT_DATA_TYPE_MISMATCH", + parameters = Map( + "columnName" -> "s.arr.element.key", + "columnType" -> "TINYINT", + "dataType" -> "INT", + "constraints" -> "delta.constraints.ck -> s . arr [ 0 ] [ 3 ] = 3" + ) + ) + checkError( + intercept[DeltaAnalysisException] { + // Migrate map's value to int type. + spark.createDataFrame(Seq(Tuple1(Tuple1(Array(Map(1 -> 999999, 3 -> 3)))))) + .toDF("s").withColumn("s", col("s").cast("struct>>")) + .write.format("delta").mode("append").saveAsTable("t") + }, + "DELTA_CONSTRAINT_DATA_TYPE_MISMATCH", + parameters = Map( + "columnName" -> "s.arr.element.value", + "columnType" -> "TINYINT", + "dataType" -> "INT", + "constraints" -> "delta.constraints.ck -> s . arr [ 0 ] [ 3 ] = 3" + ) + ) + } + } + } + test("add constraint after type change then RESTORE") { withTable("t") { sql("CREATE TABLE t (a byte) USING DELTA") diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/typewidening/TypeWideningGeneratedColumnsSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/typewidening/TypeWideningGeneratedColumnsSuite.scala index 7f8ebc2033d..3dcd43f84d8 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/typewidening/TypeWideningGeneratedColumnsSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/typewidening/TypeWideningGeneratedColumnsSuite.scala @@ -20,6 +20,7 @@ import org.apache.spark.sql.delta._ import org.apache.spark.sql.delta.sources.DeltaSQLConf import org.apache.spark.sql.{QueryTest, Row} +import org.apache.spark.sql.functions.col import org.apache.spark.sql.types._ /** @@ -92,6 +93,33 @@ trait TypeWideningGeneratedColumnTests extends GeneratedColumnTest { } } + test("generated column on arrays and maps with type change") { + withTable("t") { + createTable( + tableName = "t", + path = None, + schemaString = "a array>, gen tinyint", + generatedColumns = Map("gen" -> "a[0].f"), + partitionColumns = Seq.empty + ) + sql("INSERT INTO t (a) VALUES (array(named_struct('f', 7, 'g', 8)))") + checkAnswer(sql("SELECT gen FROM t"), Row(7)) + + sql("ALTER TABLE t CHANGE COLUMN a.element.g TYPE SMALLINT") + checkError( + intercept[DeltaAnalysisException] { + sql("ALTER TABLE t CHANGE COLUMN a.element.f TYPE SMALLINT") + }, + "DELTA_GENERATED_COLUMNS_DEPENDENT_COLUMN_CHANGE", + parameters = Map( + "columnName" -> "a.element.f", + "generatedColumns" -> "gen -> a[0].f" + )) + + checkAnswer(sql("SELECT gen FROM t"), Row(7)) + } + } + test("generated column with type evolution") { withTable("t") { createTable( @@ -154,6 +182,40 @@ trait TypeWideningGeneratedColumnTests extends GeneratedColumnTest { } } + test("generated column on arrays and maps with type evolution") { + withTable("t") { + createTable( + tableName = "t", + path = None, + schemaString = "a array, gen INT", + generatedColumns = Map("gen" -> "hash(a[0])"), + partitionColumns = Seq.empty + ) + sql("INSERT INTO t (a) VALUES (array(2, 3))") + checkAnswer(sql("SELECT gen FROM t"), Row(1765031574)) + + withSQLConf(DeltaSQLConf.DELTA_SCHEMA_AUTO_MIGRATE.key -> "true") { + // Insert by name is not supported by type evolution. + checkError( + intercept[DeltaAnalysisException] { + spark.createDataFrame(Seq(Tuple1(Array(200000, 12345)))) + .toDF("a").withColumn("a", col("a").cast("array")) + .write.format("delta").mode("append").saveAsTable("t") + }, + "DELTA_GENERATED_COLUMNS_DATA_TYPE_MISMATCH", + parameters = Map( + "columnName" -> "a.element", + "columnType" -> "TINYINT", + "dataType" -> "INT", + "generatedColumns" -> "gen -> hash(a[0])" + ) + ) + + checkAnswer(sql("SELECT gen FROM t"), Row(1765031574)) + } + } + } + test("generated column on nested field with complex type evolution") { withTable("t") { createTable(