Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Disallow enabling column mapping if invalid column mapping metadata is already present #4167

Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions spark/src/main/resources/error/delta-error-classes.json
Original file line number Diff line number Diff line change
Expand Up @@ -869,6 +869,12 @@
],
"sqlState" : "42K03"
},
"DELTA_ENABLING_COLUMN_MAPPING_DISALLOWED_WHEN_COLUMN_MAPPING_METADATA_ALREADY_EXISTS" : {
"message" : [
"Enabling column mapping when column mapping metadata is already present in schema is not supported."
],
"sqlState" : "XXKDS"
},
"DELTA_EXCEED_CHAR_VARCHAR_LIMIT" : {
"message" : [
"Value \"<value>\" exceeds char/varchar type length limitation. Failed check: <expr>."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,18 @@ trait DeltaColumnMappingBase extends DeltaLogging {
}
}

// If column mapping was disabled, but there was already column mapping in the schema, it is
// a result of a bug in the previous version of Delta. This should no longer happen with the
// stripping done above. For existing tables with this issue, we should not allow enabling
// column mapping, to prevent further corruption.
if (spark.conf.get(DeltaSQLConf.
DELTA_COLUMN_MAPPING_DISALLOW_ENABLING_WHEN_METADATA_ALREADY_EXISTS)) {
if (oldMappingMode == NoMapping && newMappingMode != NoMapping &&
schemaHasColumnMappingMetadata(oldMetadata.schema)) {
throw DeltaErrors.enablingColumnMappingDisallowedWhenColumnMappingMetadataAlreadyExists()
}
}

updatedMetadata = updateColumnMappingMetadata(
oldMetadata, updatedMetadata, isChangingModeOnExistingTable, isOverwriteSchema)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2126,6 +2126,12 @@ trait DeltaErrorsBase
messageParameters = Array(oldMode, newMode))
}

def enablingColumnMappingDisallowedWhenColumnMappingMetadataAlreadyExists(): Throwable = {
new DeltaColumnMappingUnsupportedException(
errorClass =
"DELTA_ENABLING_COLUMN_MAPPING_DISALLOWED_WHEN_COLUMN_MAPPING_METADATA_ALREADY_EXISTS")
}

def generateManifestWithColumnMappingNotSupported: Throwable = {
new DeltaColumnMappingUnsupportedException(
errorClass = "DELTA_UNSUPPORTED_MANIFEST_GENERATION_WITH_COLUMN_MAPPING")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1953,6 +1953,18 @@ trait DeltaSQLConfBase {
.booleanConf
.createWithDefault(true)

val DELTA_COLUMN_MAPPING_DISALLOW_ENABLING_WHEN_METADATA_ALREADY_EXISTS =
buildConf("columnMapping.disallowEnablingWhenColumnMappingMetadataAlreadyExists")
.doc(
"""
|If Delta table already has column mapping metadata before the feature is enabled, it is
|as a result of a corruption or a bug. Enabling column mapping in such a case can lead to
|further corruption of the table and should be disallowed.
|""".stripMargin)
.internal()
.booleanConf
.createWithDefault(true)

val DYNAMIC_PARTITION_OVERWRITE_ENABLED =
buildConf("dynamicPartitionOverwrite.enabled")
.doc("Whether to overwrite partitions dynamically when 'partitionOverwriteMode' is set to " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2106,4 +2106,49 @@ class DeltaColumnMappingSuite extends QueryTest
s"Supported modes are: $supportedModes"))
}
}

test("enabling column mapping disallowed if column mapping metadata already exists") {
withSQLConf(
// enabling this fixes the issue of committing invalid metadata in the first place
DeltaSQLConf.DELTA_COLUMN_MAPPING_STRIP_METADATA.key -> "false"
) {
withTempDir { dir =>
val path = dir.getCanonicalPath
val deltaLog = DeltaLog.forTable(spark, path)
deltaLog.withNewTransaction(catalogTableOpt = None) { txn =>
val schema =
new StructType().add("id", IntegerType, true, withIdAndPhysicalName(0, "col-0"))
val metadata = actions.Metadata(
name = "test_table",
schemaString = schema.json,
configuration = Map(DeltaConfigs.COLUMN_MAPPING_MODE.key -> NoMapping.name)
)
txn.updateMetadata(metadata)
txn.commit(Seq.empty, DeltaOperations.ManualUpdate)

// Enabling the config will disallow enabling column mapping.
withSQLConf(DeltaSQLConf
.DELTA_COLUMN_MAPPING_DISALLOW_ENABLING_WHEN_METADATA_ALREADY_EXISTS.key
-> "true") {
val e = intercept[DeltaColumnMappingUnsupportedException] {
alterTableWithProps(
s"delta.`$path`",
Map(DeltaConfigs.COLUMN_MAPPING_MODE.key -> NameMapping.name))
}
assert(e.getErrorClass ==
"DELTA_ENABLING_COLUMN_MAPPING_DISALLOWED_WHEN_COLUMN_MAPPING_METADATA_ALREADY_EXISTS")
}

// Disabling the config will allow enabling column mapping.
withSQLConf(DeltaSQLConf
.DELTA_COLUMN_MAPPING_DISALLOW_ENABLING_WHEN_METADATA_ALREADY_EXISTS.key
-> "false") {
alterTableWithProps(
s"delta.`$path`",
Map(DeltaConfigs.COLUMN_MAPPING_MODE.key -> NameMapping.name))
}
}
}
}
}
}
Loading