Skip to content

Commit

Permalink
Disallow enabling column mapping if invalid column mapping metadata i…
Browse files Browse the repository at this point in the history
…s already present (delta-io#4167)

#### Which Delta project/connector is this regarding?

- [x] Spark
- [ ] Standalone
- [ ] Flink
- [ ] Kernel
- [ ] Other (fill in here)

## Description

As effect of earlier bugs (e.g. fixed in
delta-io#3487) there can exists tables
where column mapping is disabled, but there is column mapping metadata
on the table. Enabling column mapping metadata on such a table could
lead to unexpected corruption. Simply stripping such metadata could also
lead to curruptions, as the invalid metadata can be already used in
other places (e.g. column statistics) via
DeltaColumnMapping.getPhysicalName, which returns the name from the
metadata even when column mapping is disabled.

After delta-io#3688 it should no longer be
possible to end up with tables having such invalid metadata, so the
issue only concerns existing tables created before that fix.

To avoid corruption, we want to disallow enabling column mapping on such
tables.

## How was this patch tested?

Added tests to DeltaColumnMappingSuite.

## Does this PR introduce _any_ user-facing changes?

No.
We are disallowing an operation on tables that would lead to Delta table
corruption on tables that are already in an invalid state entering which
is fixed already, so it can only concern old tables in the wild.

---------

Co-authored-by: Julek Sompolski <Juliusz Sompolski>
  • Loading branch information
juliuszsompolski authored Feb 19, 2025
1 parent 66d8099 commit 50a99fd
Show file tree
Hide file tree
Showing 5 changed files with 82 additions and 0 deletions.
7 changes: 7 additions & 0 deletions spark/src/main/resources/error/delta-error-classes.json
Original file line number Diff line number Diff line change
Expand Up @@ -869,6 +869,13 @@
],
"sqlState" : "42K03"
},
"DELTA_ENABLING_COLUMN_MAPPING_DISALLOWED_WHEN_COLUMN_MAPPING_METADATA_ALREADY_EXISTS" : {
"message" : [
"Enabling column mapping when column mapping metadata is already present in schema is not supported.",
"To use column mapping, create a new table and reload the data into it."
],
"sqlState" : "XXKDS"
},
"DELTA_EXCEED_CHAR_VARCHAR_LIMIT" : {
"message" : [
"Value \"<value>\" exceeds char/varchar type length limitation. Failed check: <expr>."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,18 @@ trait DeltaColumnMappingBase extends DeltaLogging {
}
}

// If column mapping was disabled, but there was already column mapping in the schema, it is
// a result of a bug in the previous version of Delta. This should no longer happen with the
// stripping done above. For existing tables with this issue, we should not allow enabling
// column mapping, to prevent further corruption.
if (spark.conf.get(DeltaSQLConf.
DELTA_COLUMN_MAPPING_DISALLOW_ENABLING_WHEN_METADATA_ALREADY_EXISTS)) {
if (oldMappingMode == NoMapping && newMappingMode != NoMapping &&
schemaHasColumnMappingMetadata(oldMetadata.schema)) {
throw DeltaErrors.enablingColumnMappingDisallowedWhenColumnMappingMetadataAlreadyExists()
}
}

updatedMetadata = updateColumnMappingMetadata(
oldMetadata, updatedMetadata, isChangingModeOnExistingTable, isOverwriteSchema)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2126,6 +2126,12 @@ trait DeltaErrorsBase
messageParameters = Array(oldMode, newMode))
}

def enablingColumnMappingDisallowedWhenColumnMappingMetadataAlreadyExists(): Throwable = {
new DeltaColumnMappingUnsupportedException(
errorClass =
"DELTA_ENABLING_COLUMN_MAPPING_DISALLOWED_WHEN_COLUMN_MAPPING_METADATA_ALREADY_EXISTS")
}

def generateManifestWithColumnMappingNotSupported: Throwable = {
new DeltaColumnMappingUnsupportedException(
errorClass = "DELTA_UNSUPPORTED_MANIFEST_GENERATION_WITH_COLUMN_MAPPING")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1953,6 +1953,18 @@ trait DeltaSQLConfBase {
.booleanConf
.createWithDefault(true)

val DELTA_COLUMN_MAPPING_DISALLOW_ENABLING_WHEN_METADATA_ALREADY_EXISTS =
buildConf("columnMapping.disallowEnablingWhenColumnMappingMetadataAlreadyExists")
.doc(
"""
|If Delta table already has column mapping metadata before the feature is enabled, it is
|as a result of a corruption or a bug. Enabling column mapping in such a case can lead to
|further corruption of the table and should be disallowed.
|""".stripMargin)
.internal()
.booleanConf
.createWithDefault(true)

val DYNAMIC_PARTITION_OVERWRITE_ENABLED =
buildConf("dynamicPartitionOverwrite.enabled")
.doc("Whether to overwrite partitions dynamically when 'partitionOverwriteMode' is set to " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2106,4 +2106,49 @@ class DeltaColumnMappingSuite extends QueryTest
s"Supported modes are: $supportedModes"))
}
}

test("enabling column mapping disallowed if column mapping metadata already exists") {
withSQLConf(
// enabling this fixes the issue of committing invalid metadata in the first place
DeltaSQLConf.DELTA_COLUMN_MAPPING_STRIP_METADATA.key -> "false"
) {
withTempDir { dir =>
val path = dir.getCanonicalPath
val deltaLog = DeltaLog.forTable(spark, path)
deltaLog.withNewTransaction(catalogTableOpt = None) { txn =>
val schema =
new StructType().add("id", IntegerType, true, withIdAndPhysicalName(0, "col-0"))
val metadata = actions.Metadata(
name = "test_table",
schemaString = schema.json,
configuration = Map(DeltaConfigs.COLUMN_MAPPING_MODE.key -> NoMapping.name)
)
txn.updateMetadata(metadata)
txn.commit(Seq.empty, DeltaOperations.ManualUpdate)

// Enabling the config will disallow enabling column mapping.
withSQLConf(DeltaSQLConf
.DELTA_COLUMN_MAPPING_DISALLOW_ENABLING_WHEN_METADATA_ALREADY_EXISTS.key
-> "true") {
val e = intercept[DeltaColumnMappingUnsupportedException] {
alterTableWithProps(
s"delta.`$path`",
Map(DeltaConfigs.COLUMN_MAPPING_MODE.key -> NameMapping.name))
}
assert(e.getErrorClass ==
"DELTA_ENABLING_COLUMN_MAPPING_DISALLOWED_WHEN_COLUMN_MAPPING_METADATA_ALREADY_EXISTS")
}

// Disabling the config will allow enabling column mapping.
withSQLConf(DeltaSQLConf
.DELTA_COLUMN_MAPPING_DISALLOW_ENABLING_WHEN_METADATA_ALREADY_EXISTS.key
-> "false") {
alterTableWithProps(
s"delta.`$path`",
Map(DeltaConfigs.COLUMN_MAPPING_MODE.key -> NameMapping.name))
}
}
}
}
}
}

0 comments on commit 50a99fd

Please sign in to comment.