Skip to content

Commit 7b35259

Browse files
authored
[Spark] Enable UniForm Without Rewrite (#3379)
<!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md 2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP] Your PR title ...'. 3. Be sure to keep the PR description updated to reflect all changes. 4. Please write your PR title to summarize what this PR proposes. 5. If possible, provide a concise example to reproduce the issue for a faster review. 6. If applicable, include the corresponding issue number in the PR title and link it in the body. --> #### Which Delta project/connector is this regarding? <!-- Please add the component selected below to the beginning of the pull request title For example: [Spark] Title of my pull request --> - [x] Spark - [ ] Standalone - [ ] Flink - [ ] Kernel - [ ] Other (fill in here) ## Description Now users could directly enable Delta UniForm using `ALTER TABLE SET TBLPROPERTIES` command, this only converts the corresponding metadata from `delta` to `iceberg` without rewriting all the underlying parquet files. <!-- - Describe what this PR changes. - Describe why we need the change. If this PR resolves an issue be sure to include "Resolves #XXX" to correctly link and close the issue upon merge. --> ## How was this patch tested? Through manual tests and e2e tests. <!-- If tests were added, say they were added here. Please make sure to test the changes thoroughly including negative and positive cases if possible. If the changes were tested in any way other than unit tests, please clarify how you tested step by step (ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future). If the changes were not tested, please explain why. --> ## Does this PR introduce _any_ user-facing changes? Yes, this PR let users enable Delta UniForm directly via `ALTER TABLE SET TBLPROPERTIES` command. <!-- If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible. If possible, please also clarify if this is a user-facing change compared to the released Delta Lake versions or within the unreleased branches such as master. If no, write 'No'. -->
1 parent 573a57f commit 7b35259

File tree

4 files changed

+92
-46
lines changed

4 files changed

+92
-46
lines changed

spark/src/main/resources/error/delta-error-classes.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1106,7 +1106,7 @@
11061106
},
11071107
"DELETION_VECTORS_SHOULD_BE_DISABLED" : {
11081108
"message" : [
1109-
"IcebergCompatV<version> requires Deletion Vectors to be disabled on the table. Please use the ALTER TABLE DROP FEATURE command to disable Deletion Vectors and to remove the existing Deletion Vectors from the table."
1109+
"IcebergCompatV<version> requires Deletion Vectors to be disabled on the table first. Then run REORG PURGE command to purge the Deletion Vectors on the table."
11101110
]
11111111
},
11121112
"DISABLING_REQUIRED_TABLE_FEATURE" : {
@@ -1152,7 +1152,7 @@
11521152
},
11531153
"VERSION_MUTUAL_EXCLUSIVE" : {
11541154
"message" : [
1155-
"Only one IcebergCompat version can be enabled."
1155+
"Only one IcebergCompat version can be enabled, please explicitly disable all other IcebergCompat versions that are not needed."
11561156
]
11571157
},
11581158
"WRONG_REQUIRED_TABLE_PROPERTY" : {

spark/src/main/scala/org/apache/spark/sql/delta/IcebergCompat.scala

Lines changed: 53 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,8 @@ object IcebergCompatV1 extends IcebergCompat(
4747
CheckAddFileHasStats,
4848
CheckNoPartitionEvolution,
4949
CheckNoListMapNullType,
50-
CheckNoDeletionVector,
51-
CheckVersionChangeNeedsRewrite)
50+
CheckDeletionVectorDisabled
51+
)
5252
)
5353

5454
object IcebergCompatV2 extends IcebergCompat(
@@ -61,8 +61,8 @@ object IcebergCompatV2 extends IcebergCompat(
6161
CheckAddFileHasStats,
6262
CheckTypeInV2AllowList,
6363
CheckNoPartitionEvolution,
64-
CheckNoDeletionVector,
65-
CheckVersionChangeNeedsRewrite)
64+
CheckDeletionVectorDisabled
65+
)
6666
)
6767

6868
/**
@@ -106,14 +106,16 @@ case class IcebergCompat(
106106
prevSnapshot: Snapshot,
107107
newestProtocol: Protocol,
108108
newestMetadata: Metadata,
109-
isCreatingOrReorgTable: Boolean,
109+
operation: Option[DeltaOperations.Operation],
110110
actions: Seq[Action]): (Option[Protocol], Option[Metadata]) = {
111111
val prevProtocol = prevSnapshot.protocol
112112
val prevMetadata = prevSnapshot.metadata
113113
val wasEnabled = this.isEnabled(prevMetadata)
114114
val isEnabled = this.isEnabled(newestMetadata)
115115
val tableId = newestMetadata.id
116116

117+
val isCreatingOrReorgTable = UniversalFormat.isCreatingOrReorgTable(operation)
118+
117119
(wasEnabled, isEnabled) match {
118120
case (_, false) => (None, None) // not enable or disabling, Ignore
119121
case (_, true) => // Enabling now or already-enabled
@@ -186,10 +188,15 @@ case class IcebergCompat(
186188
} else None
187189

188190
// Apply additional checks
189-
val context = IcebergCompatContext(prevSnapshot,
191+
val context = IcebergCompatContext(
192+
prevSnapshot,
190193
protocolResult.getOrElse(newestProtocol),
191194
metadataResult.getOrElse(newestMetadata),
192-
isCreatingOrReorgTable, actions, tableId, version)
195+
operation,
196+
actions,
197+
tableId,
198+
version
199+
)
193200
checks.foreach(_.apply(context))
194201

195202
(protocolResult, metadataResult)
@@ -241,7 +248,10 @@ object IcebergCompat extends DeltaLogging {
241248
* @return true if the target version is enabled on the table.
242249
*/
243250
def isVersionEnabled(metadata: Metadata, version: Integer): Boolean =
244-
knownVersions.exists{ case (_, v) => v == version }
251+
knownVersions.exists {
252+
case (config, v) =>
253+
(v == version) && (config.fromMetaData(metadata).getOrElse(false))
254+
}
245255
}
246256

247257
/**
@@ -293,7 +303,7 @@ case class IcebergCompatContext(
293303
prevSnapshot: Snapshot,
294304
newestProtocol: Protocol,
295305
newestMetadata: Metadata,
296-
isCreatingOrReorgTable: Boolean,
306+
operation: Option[DeltaOperations.Operation],
297307
actions: Seq[Action],
298308
tableId: String,
299309
version: Integer) {
@@ -310,9 +320,9 @@ trait IcebergCompatCheck extends (IcebergCompatContext => Unit)
310320
object CheckOnlySingleVersionEnabled extends IcebergCompatCheck {
311321
override def apply(context: IcebergCompatContext): Unit = {
312322
val numEnabled = IcebergCompat.knownVersions
313-
.map{ case (config, _) =>
314-
if (config.fromMetaData(context.newestMetadata).getOrElse(false)) 1 else 0 }
315-
.sum
323+
.map { case (config, _) =>
324+
if (config.fromMetaData(context.newestMetadata).getOrElse(false)) 1 else 0
325+
}.sum
316326
if (numEnabled > 1) {
317327
throw DeltaErrors.icebergCompatVersionMutualExclusive(context.version)
318328
}
@@ -388,34 +398,40 @@ object CheckTypeInV2AllowList extends IcebergCompatCheck {
388398
}
389399
}
390400

391-
object CheckNoDeletionVector extends IcebergCompatCheck {
392-
393-
override def apply(context: IcebergCompatContext): Unit = {
394-
// Check for incompatible table features;
395-
// Deletion Vectors cannot be writeable; Note that concurrent txns are also covered
396-
// to NOT write deletion vectors as that txn would need to make DVs writable, which
397-
// would conflict with current txn because of metadata change.
398-
if (DeletionVectorUtils.deletionVectorsWritable(
399-
context.newestProtocol, context.newestMetadata)) {
400-
throw DeltaErrors.icebergCompatDeletionVectorsShouldBeDisabledException(context.version)
401-
}
402-
}
403-
}
404-
405-
406401
/**
407-
* Check if change IcebergCompat version needs a REORG operation
402+
* Check if the deletion vector has been disabled by previous snapshot
403+
* or newest metadata and protocol depending on whether the operation
404+
* is REORG UPGRADE UNIFORM or not.
408405
*/
409-
object CheckVersionChangeNeedsRewrite extends IcebergCompatCheck {
410-
411-
private val versionChangesWithoutRewrite: Map[Int, Set[Int]] =
412-
Map(0 -> Set(0, 1), 1 -> Set(0, 1), 2 -> Set(0, 1, 2))
406+
object CheckDeletionVectorDisabled extends IcebergCompatCheck {
413407
override def apply(context: IcebergCompatContext): Unit = {
414-
if (!context.isCreatingOrReorgTable) {
415-
val oldVersion = IcebergCompat.getEnabledVersion(context.prevMetadata).getOrElse(0)
416-
val allowedChanges = versionChangesWithoutRewrite.getOrElse(oldVersion, Set.empty[Int])
417-
if (!allowedChanges.contains(context.version)) {
418-
throw DeltaErrors.icebergCompatChangeVersionNeedRewrite(oldVersion, context.version)
408+
if (context.newestProtocol.isFeatureSupported(DeletionVectorsTableFeature)) {
409+
// note: user will need to *separately* disable deletion vectors if this check fails,
410+
// i.e., ALTER TABLE SET TBLPROPERTIES ('delta.enableDeletionVectors' = 'false');
411+
val isReorgUpgradeUniform = UniversalFormat.isReorgUpgradeUniform(context.operation)
412+
// for REORG UPGRADE UNIFORM, we only need to check whether DV
413+
// is enabled in the newest metadata and protocol, this conforms with
414+
// the semantics of REORG UPGRADE UNIFORM, which will automatically disable
415+
// DV and rewrite all the parquet files with DV removed as for now.
416+
if (isReorgUpgradeUniform) {
417+
if (DeletionVectorUtils.deletionVectorsWritable(
418+
protocol = context.newestProtocol,
419+
metadata = context.newestMetadata
420+
)) {
421+
throw DeltaErrors.icebergCompatDeletionVectorsShouldBeDisabledException(context.version)
422+
}
423+
} else {
424+
// for other commands, we need to check whether DV is disabled from the
425+
// previous snapshot, in case there are concurrent writers.
426+
// plus, we also need to check from the newest metadata and protocol,
427+
// in case we are creating a new uniform table with DV enabled.
428+
if (DeletionVectorUtils.deletionVectorsWritable(context.prevSnapshot) ||
429+
DeletionVectorUtils.deletionVectorsWritable(
430+
protocol = context.newestProtocol,
431+
metadata = context.newestMetadata
432+
)) {
433+
throw DeltaErrors.icebergCompatDeletionVectorsShouldBeDisabledException(context.version)
434+
}
419435
}
420436
}
421437
}

spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1717,7 +1717,7 @@ trait OptimisticTransactionImpl extends TransactionalWrite
17171717
snapshot,
17181718
newestProtocol = protocol, // Note: this will try to use `newProtocol`
17191719
newestMetadata = metadata, // Note: this will try to use `newMetadata`
1720-
isCreatingNewTable || op.isInstanceOf[DeltaOperations.UpgradeUniformProperties],
1720+
Some(op),
17211721
otherActions
17221722
)
17231723
newProtocol = protocolUpdate1.orElse(newProtocol)

spark/src/main/scala/org/apache/spark/sql/delta/UniversalFormat.scala

Lines changed: 36 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,35 @@ object UniversalFormat extends DeltaLogging {
5555
val HUDI_FORMAT = "hudi"
5656
val SUPPORTED_FORMATS = Set(HUDI_FORMAT, ICEBERG_FORMAT)
5757

58+
/**
59+
* Check if the operation is CREATE/REPLACE TABLE or REORG UPGRADE UNIFORM commands.
60+
*
61+
* @param op the delta operation to be checked.
62+
* @return whether the operation is create or reorg.
63+
*/
64+
def isCreatingOrReorgTable(op: Option[DeltaOperations.Operation]): Boolean = op match {
65+
case Some(_: DeltaOperations.CreateTable) |
66+
Some(_: DeltaOperations.UpgradeUniformProperties) |
67+
// REPLACE TABLE is also considered creating table to preserve the
68+
// the semantics for `isCreatingNewTable` in `OptimisticTransaction`.
69+
Some(_: DeltaOperations.ReplaceTable) =>
70+
true
71+
// this is to conform with the semantics in `enforceDependenciesInConfiguration`
72+
case None => true
73+
case _ => false
74+
}
75+
76+
/**
77+
* Check if the operation is REORG UPGRADE UNIFORM command.
78+
*
79+
* @param op the delta operation to be checked.
80+
* @return whether the operation is REORG UPGRADE UNIFORM.
81+
*/
82+
def isReorgUpgradeUniform(op: Option[DeltaOperations.Operation]): Boolean = op match {
83+
case Some(_: DeltaOperations.UpgradeUniformProperties) => true
84+
case _ => false
85+
}
86+
5887
def icebergEnabled(metadata: Metadata): Boolean = {
5988
DeltaConfigs.UNIVERSAL_FORMAT_ENABLED_FORMATS.fromMetaData(metadata).contains(ICEBERG_FORMAT)
6089
}
@@ -83,11 +112,11 @@ object UniversalFormat extends DeltaLogging {
83112
snapshot: Snapshot,
84113
newestProtocol: Protocol,
85114
newestMetadata: Metadata,
86-
isCreatingOrReorgTable: Boolean,
115+
operation: Option[DeltaOperations.Operation],
87116
actions: Seq[Action]): (Option[Protocol], Option[Metadata]) = {
88117
enforceHudiDependencies(newestMetadata, snapshot)
89118
enforceIcebergInvariantsAndDependencies(
90-
snapshot, newestProtocol, newestMetadata, isCreatingOrReorgTable, actions)
119+
snapshot, newestProtocol, newestMetadata, operation, actions)
91120
}
92121

93122
/**
@@ -125,7 +154,7 @@ object UniversalFormat extends DeltaLogging {
125154
snapshot: Snapshot,
126155
newestProtocol: Protocol,
127156
newestMetadata: Metadata,
128-
isCreatingOrReorg: Boolean,
157+
operation: Option[DeltaOperations.Operation],
129158
actions: Seq[Action]): (Option[Protocol], Option[Metadata]) = {
130159

131160
val prevMetadata = snapshot.metadata
@@ -174,7 +203,7 @@ object UniversalFormat extends DeltaLogging {
174203
snapshot,
175204
newestProtocol = protocolToCheck,
176205
newestMetadata = metadataToCheck,
177-
isCreatingOrReorg,
206+
operation,
178207
actions
179208
)
180209
protocolToCheck = v1protocolUpdate.getOrElse(protocolToCheck)
@@ -185,7 +214,7 @@ object UniversalFormat extends DeltaLogging {
185214
snapshot,
186215
newestProtocol = protocolToCheck,
187216
newestMetadata = metadataToCheck,
188-
isCreatingOrReorg,
217+
operation,
189218
actions
190219
)
191220
changed ||= v2protocolUpdate.nonEmpty || v2metadataUpdate.nonEmpty
@@ -218,7 +247,7 @@ object UniversalFormat extends DeltaLogging {
218247
snapshot,
219248
newestProtocol = snapshot.protocol,
220249
newestMetadata = metadata,
221-
isCreatingOrReorgTable = true,
250+
operation = None,
222251
actions = Seq()
223252
)
224253

@@ -257,6 +286,7 @@ object UniversalFormat extends DeltaLogging {
257286
}
258287
}
259288
}
289+
260290
/** Class to facilitate the conversion of Delta into other table formats. */
261291
abstract class UniversalFormatConverter(spark: SparkSession) {
262292
/**

0 commit comments

Comments
 (0)