Skip to content

Commit 0c35eea

Browse files
[Spark] Column Mapping DROP FEATURE (#3124)
<!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md 2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP] Your PR title ...'. 3. Be sure to keep the PR description updated to reflect all changes. 4. Please write your PR title to summarize what this PR proposes. 5. If possible, provide a concise example to reproduce the issue for a faster review. 6. If applicable, include the corresponding issue number in the PR title and link it in the body. --> #### Which Delta project/connector is this regarding? <!-- Please add the component selected below to the beginning of the pull request title For example: [Spark] Title of my pull request --> - [x] Spark - [ ] Standalone - [ ] Flink - [ ] Kernel - [ ] Other (fill in here) ## Description Allow column mapping feature to be dropped. ``` ALTER TABLE <table_name> DROP FEATURE columnMapping ``` Feature is hidden behind a flag. <!-- - Describe what this PR changes. - Describe why we need the change. If this PR resolves an issue be sure to include "Resolves #XXX" to correctly link and close the issue upon merge. --> ## How was this patch tested? new unit tests <!-- If tests were added, say they were added here. Please make sure to test the changes thoroughly including negative and positive cases if possible. If the changes were tested in any way other than unit tests, please clarify how you tested step by step (ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future). If the changes were not tested, please explain why. --> ## Does this PR introduce _any_ user-facing changes? <!-- If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible. If possible, please also clarify if this is a user-facing change compared to the released Delta Lake versions or within the unreleased branches such as master. If no, write 'No'. --> No
1 parent 529717b commit 0c35eea

File tree

4 files changed

+292
-6
lines changed

4 files changed

+292
-6
lines changed

spark/src/main/scala/org/apache/spark/sql/delta/PreDowngradeTableFeatureCommand.scala

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,10 @@ import scala.util.control.NonFatal
2222

2323
import org.apache.spark.sql.delta.catalog.DeltaTableV2
2424
import org.apache.spark.sql.delta.commands.{AlterTableSetPropertiesDeltaCommand, AlterTableUnsetPropertiesDeltaCommand, DeltaReorgTableCommand, DeltaReorgTableMode, DeltaReorgTableSpec}
25+
import org.apache.spark.sql.delta.commands.columnmapping.RemoveColumnMappingCommand
2526
import org.apache.spark.sql.delta.managedcommit.ManagedCommitUtils
2627
import org.apache.spark.sql.delta.metering.DeltaLogging
28+
import org.apache.spark.sql.delta.schema.SchemaMergingUtils
2729
import org.apache.spark.sql.delta.util.{Utils => DeltaUtils}
2830
import org.apache.spark.sql.util.ScalaExtensions._
2931

@@ -344,3 +346,33 @@ case class TypeWideningPreDowngradeCommand(table: DeltaTableV2)
344346
true
345347
}
346348
}
349+
case class ColumnMappingPreDowngradeCommand(table: DeltaTableV2)
350+
extends PreDowngradeTableFeatureCommand
351+
with DeltaLogging {
352+
353+
/**
354+
* We first remove the table feature property to prevent any transactions from writting data
355+
* files with the physical names. This will cause any concurrent transactions to fail.
356+
* Then, we run RemoveColumnMappingCommand to rewrite the files rename columns.
357+
* Note, during the protocol downgrade phase we validate whether all invariants still hold.
358+
* This should detect if any concurrent txns enabled the table property again.
359+
*
360+
* @return Returns true if it removed table property and/or has rewritten the data.
361+
* False otherwise.
362+
*/
363+
override def removeFeatureTracesIfNeeded(): Boolean = {
364+
val spark = table.spark
365+
366+
// Latest snapshot looks clean. No action is required. We may proceed
367+
// to the protocol downgrade phase.
368+
if (ColumnMappingTableFeature.validateRemoval(table.initialSnapshot)) return false
369+
370+
recordDeltaOperation(
371+
table.deltaLog,
372+
opType = "delta.columnMappingFeatureRemoval") {
373+
RemoveColumnMappingCommand(table.deltaLog, table.catalogTable)
374+
.run(spark, removeColumnMappingTableProperty = true)
375+
}
376+
true
377+
}
378+
}

spark/src/main/scala/org/apache/spark/sql/delta/TableFeature.scala

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import org.apache.spark.sql.delta.actions._
2222
import org.apache.spark.sql.delta.catalog.DeltaTableV2
2323
import org.apache.spark.sql.delta.constraints.{Constraints, Invariants}
2424
import org.apache.spark.sql.delta.managedcommit.ManagedCommitUtils
25+
import org.apache.spark.sql.delta.schema.SchemaMergingUtils
2526
import org.apache.spark.sql.delta.schema.SchemaUtils
2627
import org.apache.spark.sql.delta.sources.DeltaSQLConf
2728
import org.apache.spark.sql.delta.util.{Utils => DeltaUtils}
@@ -490,6 +491,7 @@ object ColumnMappingTableFeature
490491
name = "columnMapping",
491492
minReaderVersion = 2,
492493
minWriterVersion = 5)
494+
with RemovableFeature
493495
with FeatureAutomaticallyEnabledByMetadata {
494496
override def metadataRequiresFeatureToBeEnabled(
495497
metadata: Metadata,
@@ -499,6 +501,27 @@ object ColumnMappingTableFeature
499501
case _ => true
500502
}
501503
}
504+
505+
override def validateRemoval(snapshot: Snapshot): Boolean = {
506+
val schemaHasNoColumnMappingMetadata =
507+
SchemaMergingUtils.explode(snapshot.schema).forall { case (_, col) =>
508+
!DeltaColumnMapping.hasPhysicalName(col) &&
509+
!DeltaColumnMapping.hasColumnId(col)
510+
}
511+
val metadataHasNoMappingMode = snapshot.metadata.columnMappingMode match {
512+
case NoMapping => true
513+
case _ => false
514+
}
515+
schemaHasNoColumnMappingMetadata && metadataHasNoMappingMode
516+
}
517+
518+
override def actionUsesFeature(action: Action): Boolean = action match {
519+
case m: Metadata => DeltaConfigs.COLUMN_MAPPING_MODE.fromMetaData(m) != NoMapping
520+
case _ => false
521+
}
522+
523+
override def preDowngradeCommand(table: DeltaTableV2): PreDowngradeTableFeatureCommand =
524+
ColumnMappingPreDowngradeCommand(table)
502525
}
503526

504527
object IdentityColumnsTableFeature
Lines changed: 224 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,224 @@
1+
/*
2+
* Copyright (2021) The Delta Lake Project Authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.apache.spark.sql.delta.columnmapping
18+
19+
import java.util.concurrent.TimeUnit
20+
21+
import org.apache.spark.sql.delta._
22+
import org.apache.spark.sql.delta.DeltaConfigs._
23+
import org.apache.spark.sql.delta.sources.DeltaSQLConf._
24+
25+
import org.apache.spark.sql.catalyst.TableIdentifier
26+
import org.apache.spark.util.ManualClock
27+
28+
/**
29+
* Test dropping column mapping feature from a table.
30+
*/
31+
class DropColumnMappingFeatureSuite extends RemoveColumnMappingSuiteUtils {
32+
33+
val clock = new ManualClock(System.currentTimeMillis())
34+
test("column mapping cannot be dropped without the feature flag") {
35+
withSQLConf(ALLOW_COLUMN_MAPPING_REMOVAL.key -> "false") {
36+
sql(s"""CREATE TABLE $testTableName
37+
|USING delta
38+
|TBLPROPERTIES ('${DeltaConfigs.COLUMN_MAPPING_MODE.key}' = 'name',
39+
| 'delta.minReaderVersion' = '3',
40+
| 'delta.minWriterVersion' = '7')
41+
|AS SELECT 1 as a
42+
|""".stripMargin)
43+
44+
intercept[DeltaColumnMappingUnsupportedException] {
45+
dropColumnMappingTableFeature()
46+
}
47+
}
48+
}
49+
50+
test("table without column mapping enabled") {
51+
sql(s"""CREATE TABLE $testTableName
52+
|USING delta
53+
|TBLPROPERTIES ('${DeltaConfigs.COLUMN_MAPPING_MODE.key}' = 'none')
54+
|AS SELECT 1 as a
55+
|""".stripMargin)
56+
57+
val e = intercept[DeltaTableFeatureException] {
58+
dropColumnMappingTableFeature()
59+
}
60+
checkError(e,
61+
errorClass = DeltaErrors.dropTableFeatureFeatureNotSupportedByProtocol(".")
62+
.getErrorClass,
63+
parameters = Map("feature" -> "columnMapping"))
64+
}
65+
66+
test("invalid column names") {
67+
val invalidColName1 = colName("col1")
68+
val invalidColName2 = colName("col2")
69+
sql(
70+
s"""CREATE TABLE $testTableName (a INT, `$invalidColName1` INT, `$invalidColName2` INT)
71+
|USING delta
72+
|TBLPROPERTIES ('delta.columnMapping.mode' = 'name')
73+
|""".stripMargin)
74+
val e = intercept[DeltaTableFeatureException] {
75+
dropColumnMappingTableFeature()
76+
}
77+
}
78+
79+
test("drop column mapping from a table without table feature") {
80+
sql(
81+
s"""CREATE TABLE $testTableName
82+
|USING delta
83+
|TBLPROPERTIES ('${DeltaConfigs.COLUMN_MAPPING_MODE.key}' = 'name',
84+
| '${DeltaConfigs.ENABLE_DELETION_VECTORS_CREATION.key}' = 'false',
85+
| 'delta.minReaderVersion' = '1',
86+
| 'delta.minWriterVersion' = '1')
87+
|AS SELECT id as $logicalColumnName, id + 1 as $secondColumn
88+
| FROM RANGE(0, $totalRows, 1, $numFiles)
89+
|""".stripMargin)
90+
val e = intercept[DeltaTableFeatureException] {
91+
dropColumnMappingTableFeature()
92+
}
93+
checkError(e,
94+
errorClass = "DELTA_FEATURE_DROP_FEATURE_NOT_PRESENT",
95+
parameters = Map("feature" -> "columnMapping"))
96+
}
97+
98+
test("drop column mapping from a table with table feature") {
99+
sql(
100+
s"""CREATE TABLE $testTableName
101+
|USING delta
102+
|TBLPROPERTIES ('${DeltaConfigs.COLUMN_MAPPING_MODE.key}' = 'name',
103+
| '${DeltaConfigs.ENABLE_DELETION_VECTORS_CREATION.key}' = 'false',
104+
| 'delta.minReaderVersion' = '3',
105+
| 'delta.minWriterVersion' = '7')
106+
|AS SELECT id as $logicalColumnName, id + 1 as $secondColumn
107+
| FROM RANGE(0, $totalRows, 1, $numFiles)
108+
|""".stripMargin)
109+
testDroppingColumnMapping()
110+
}
111+
112+
test("drop column mapping from a table without column mapping table property") {
113+
sql(
114+
s"""CREATE TABLE $testTableName
115+
|USING delta
116+
|TBLPROPERTIES ('${DeltaConfigs.COLUMN_MAPPING_MODE.key}' = 'name',
117+
| '${DeltaConfigs.ENABLE_DELETION_VECTORS_CREATION.key}' = 'false',
118+
| 'delta.minReaderVersion' = '3',
119+
| 'delta.minWriterVersion' = '7')
120+
|AS SELECT id as $logicalColumnName, id + 1 as $secondColumn
121+
| FROM RANGE(0, $totalRows, 1, $numFiles)
122+
|""".stripMargin)
123+
unsetColumnMappingProperty(useUnset = true)
124+
val e = intercept[DeltaTableFeatureException] {
125+
dropColumnMappingTableFeature()
126+
}
127+
checkError(
128+
e,
129+
errorClass = "DELTA_FEATURE_DROP_HISTORICAL_VERSIONS_EXIST",
130+
parameters = Map(
131+
"feature" -> "columnMapping",
132+
"logRetentionPeriodKey" -> "delta.logRetentionDuration",
133+
"logRetentionPeriod" -> "30 days",
134+
"truncateHistoryLogRetentionPeriod" -> "24 hours")
135+
)
136+
}
137+
138+
test("drop column mapping in id mode") {
139+
sql(
140+
s"""CREATE TABLE $testTableName
141+
|USING delta
142+
|TBLPROPERTIES ('${DeltaConfigs.COLUMN_MAPPING_MODE.key}' = 'id',
143+
| '${DeltaConfigs.ENABLE_DELETION_VECTORS_CREATION.key}' = 'false',
144+
| 'delta.minReaderVersion' = '3',
145+
| 'delta.minWriterVersion' = '7')
146+
|AS SELECT id as $logicalColumnName, id + 1 as $secondColumn
147+
| FROM RANGE(0, $totalRows, 1, $numFiles)
148+
|""".stripMargin)
149+
testDroppingColumnMapping()
150+
}
151+
152+
def testDroppingColumnMapping(): Unit = {
153+
// Verify the input data is as expected.
154+
val originalData = spark.table(tableName = testTableName).select(logicalColumnName).collect()
155+
// Add a schema comment and verify it is preserved after the rewrite.
156+
val comment = "test comment"
157+
sql(s"ALTER TABLE $testTableName ALTER COLUMN $logicalColumnName COMMENT '$comment'")
158+
159+
val deltaLog = DeltaLog.forTable(spark, TableIdentifier(tableName = testTableName))
160+
val originalSnapshot = deltaLog.update()
161+
162+
assert(originalSnapshot.schema.head.getComment().get == comment,
163+
"Renamed column should preserve comment.")
164+
val originalFiles = getFiles(originalSnapshot)
165+
val startingVersion = deltaLog.update().version
166+
167+
val e = intercept[DeltaTableFeatureException] {
168+
dropColumnMappingTableFeature()
169+
}
170+
checkError(
171+
e,
172+
errorClass = "DELTA_FEATURE_DROP_WAIT_FOR_RETENTION_PERIOD",
173+
parameters = Map(
174+
"feature" -> "columnMapping",
175+
"logRetentionPeriodKey" -> "delta.logRetentionDuration",
176+
"logRetentionPeriod" -> "30 days",
177+
"truncateHistoryLogRetentionPeriod" -> "24 hours")
178+
)
179+
180+
verifyRewrite(
181+
unsetTableProperty = true,
182+
deltaLog,
183+
originalFiles,
184+
startingVersion,
185+
originalData = originalData,
186+
droppedFeature = true)
187+
// Verify the schema comment is preserved after the rewrite.
188+
assert(deltaLog.update().schema.head.getComment().get == comment,
189+
"Should preserve the schema comment.")
190+
verifyDropFeatureTruncateHistory()
191+
}
192+
193+
protected def verifyDropFeatureTruncateHistory() = {
194+
val deltaLog1 = DeltaLog.forTable(spark, TableIdentifier(tableName = testTableName), clock)
195+
// Populate the delta cache with the delta log with the right data path so it stores the clock.
196+
// This is currently the only way to make sure the drop feature command uses the clock.
197+
DeltaLog.clearCache()
198+
DeltaLog.forTable(spark, deltaLog1.dataPath, clock)
199+
// Set the log retention to 0 so that we can test truncate history.
200+
sql(
201+
s"""
202+
|ALTER TABLE $testTableName SET TBLPROPERTIES (
203+
| '${TABLE_FEATURE_DROP_TRUNCATE_HISTORY_LOG_RETENTION.key}' = '0 hours',
204+
| '${LOG_RETENTION.key}' = '0 hours')
205+
|""".stripMargin)
206+
// Pretend enough time has passed for the history to be truncated.
207+
clock.advance(TimeUnit.MINUTES.toMillis(5))
208+
sql(
209+
s"""
210+
|ALTER TABLE $testTableName DROP FEATURE ${ColumnMappingTableFeature.name} TRUNCATE HISTORY
211+
|""".stripMargin)
212+
val newSnapshot = deltaLog.update()
213+
assert(newSnapshot.protocol.readerAndWriterFeatures.isEmpty, "Should drop the feature.")
214+
assert(newSnapshot.protocol.minWriterVersion == 1)
215+
assert(newSnapshot.protocol.minReaderVersion == 1)
216+
}
217+
218+
protected def dropColumnMappingTableFeature(): Unit = {
219+
sql(
220+
s"""
221+
|ALTER TABLE $testTableName DROP FEATURE ${ColumnMappingTableFeature.name}
222+
|""".stripMargin)
223+
}
224+
}

spark/src/test/scala/org/apache/spark/sql/delta/columnmapping/RemoveColumnMappingSuiteUtils.scala

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ trait RemoveColumnMappingSuiteUtils extends QueryTest with DeltaColumnMappingSui
7070
val originalSnapshot = deltaLog.update()
7171

7272
assert(originalSnapshot.schema.head.getComment().get == comment,
73-
"Renamed column should preserved comment.")
73+
"Renamed column should preserve comment.")
7474
val originalFiles = getFiles(originalSnapshot)
7575
val startingVersion = deltaLog.update().version
7676

@@ -96,22 +96,29 @@ trait RemoveColumnMappingSuiteUtils extends QueryTest with DeltaColumnMappingSui
9696
deltaLog: DeltaLog,
9797
originalFiles: Array[AddFile],
9898
startingVersion: Long,
99-
originalData: Array[Row]): Unit = {
99+
originalData: Array[Row],
100+
droppedFeature: Boolean = false): Unit = {
100101
checkAnswer(
101102
spark.table(tableName = testTableName).select(logicalColumnName),
102103
originalData)
103-
104104
val newSnapshot = deltaLog.update()
105-
assert(newSnapshot.version - startingVersion == 1, "Should rewrite the table in one commit.")
105+
val versionsAddedByRewrite = if (droppedFeature) {
106+
2
107+
} else {
108+
1
109+
}
110+
assert(newSnapshot.version - startingVersion == versionsAddedByRewrite,
111+
s"Should rewrite the table in $versionsAddedByRewrite commits.")
106112

107-
val history = deltaLog.history.getHistory(deltaLog.update().version)
113+
val rewriteVersion = deltaLog.update().version - versionsAddedByRewrite + 1
114+
val history = deltaLog.history.getHistory(rewriteVersion, Some(rewriteVersion))
108115
verifyColumnMappingOperationIsRecordedInHistory(history)
109116

110117
assert(newSnapshot.schema.head.name == logicalColumnName, "Should rename the first column.")
111118

112119
verifyColumnMappingSchemaMetadataIsRemoved(newSnapshot)
113120

114-
verifyColumnMappingTablePropertiesAbsent(newSnapshot, unsetTableProperty)
121+
verifyColumnMappingTablePropertiesAbsent(newSnapshot, unsetTableProperty || droppedFeature)
115122
assert(originalFiles.map(_.numLogicalRecords.get).sum ==
116123
newSnapshot.allFiles.map(_.numLogicalRecords.get).collect().sum,
117124
"Should have the same number of records.")

0 commit comments

Comments
 (0)