Skip to content

Commit 2990111

Browse files
committed
Add support for renaming column in Delta Lake
1 parent 0df8e32 commit 2990111

File tree

5 files changed

+222
-3
lines changed

5 files changed

+222
-3
lines changed

plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import io.trino.plugin.deltalake.transactionlog.AddFileEntry;
4444
import io.trino.plugin.deltalake.transactionlog.CdfFileEntry;
4545
import io.trino.plugin.deltalake.transactionlog.CommitInfoEntry;
46+
import io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.ColumnMappingMode;
4647
import io.trino.plugin.deltalake.transactionlog.DeltaLakeTransactionLogEntry;
4748
import io.trino.plugin.deltalake.transactionlog.MetadataEntry;
4849
import io.trino.plugin.deltalake.transactionlog.MetadataEntry.Format;
@@ -268,9 +269,11 @@ public class DeltaLakeMetadata
268269
LAZY_SIMPLE_SERDE_CLASS,
269270
SEQUENCEFILE_INPUT_FORMAT_CLASS,
270271
HIVE_SEQUENCEFILE_OUTPUT_FORMAT_CLASS);
272+
// Operation names in Delta Lake https://github.com/delta-io/delta/blob/master/core/src/main/scala/org/apache/spark/sql/delta/DeltaOperations.scala
271273
public static final String CREATE_TABLE_AS_OPERATION = "CREATE TABLE AS SELECT";
272274
public static final String CREATE_TABLE_OPERATION = "CREATE TABLE";
273275
public static final String ADD_COLUMN_OPERATION = "ADD COLUMNS";
276+
public static final String RENAME_COLUMN_OPERATION = "RENAME COLUMN";
274277
public static final String INSERT_OPERATION = "WRITE";
275278
public static final String MERGE_OPERATION = "MERGE";
276279
public static final String OPTIMIZE_OPERATION = "OPTIMIZE";
@@ -1221,6 +1224,66 @@ public void addColumn(ConnectorSession session, ConnectorTableHandle tableHandle
12211224
}
12221225
}
12231226

1227+
@Override
1228+
public void renameColumn(ConnectorSession session, ConnectorTableHandle tableHandle, ColumnHandle columnHandle, String newColumnName)
1229+
{
1230+
DeltaLakeTableHandle table = (DeltaLakeTableHandle) tableHandle;
1231+
DeltaLakeColumnHandle deltaLakeColumn = (DeltaLakeColumnHandle) columnHandle;
1232+
String sourceColumnName = deltaLakeColumn.getName();
1233+
1234+
ColumnMappingMode columnMappingMode = getColumnMappingMode(table.getMetadataEntry());
1235+
if (columnMappingMode != ColumnMappingMode.NAME && columnMappingMode != ColumnMappingMode.ID) {
1236+
throw new TrinoException(NOT_SUPPORTED, "Cannot rename column with the column mapping: " + columnMappingMode);
1237+
}
1238+
1239+
ConnectorTableMetadata tableMetadata = getTableMetadata(session, table);
1240+
long commitVersion = table.getReadVersion() + 1;
1241+
List<String> partitionColumns = getPartitionedBy(tableMetadata.getProperties()).stream()
1242+
.map(columnName -> columnName.equals(sourceColumnName) ? newColumnName : columnName)
1243+
.collect(toImmutableList());
1244+
1245+
List<DeltaLakeColumnHandle> columns = tableMetadata.getColumns().stream()
1246+
.filter(column -> !column.isHidden())
1247+
.map(column -> toColumnHandle(
1248+
column.getName().equals(sourceColumnName) ? ColumnMetadata.builderFrom(column).setName(newColumnName).build() : column,
1249+
column.getName().equals(sourceColumnName) ? newColumnName : column.getName(),
1250+
column.getType(),
1251+
partitionColumns))
1252+
.collect(toImmutableList());
1253+
Map<String, String> columnComments = getColumnComments(table.getMetadataEntry()).entrySet().stream()
1254+
.map(column -> column.getKey().equals(sourceColumnName) ? Map.entry(newColumnName, column.getValue()) : column)
1255+
.collect(toImmutableMap(Map.Entry::getKey, Map.Entry::getValue));
1256+
Map<String, Boolean> columnsNullability = getColumnsNullability(table.getMetadataEntry()).entrySet().stream()
1257+
.map(column -> column.getKey().equals(sourceColumnName) ? Map.entry(newColumnName, column.getValue()) : column)
1258+
.collect(toImmutableMap(Map.Entry::getKey, Map.Entry::getValue));
1259+
Map<String, Map<String, Object>> columnMetadata = getColumnsMetadata(table.getMetadataEntry()).entrySet().stream()
1260+
.map(column -> column.getKey().equals(sourceColumnName) ? Map.entry(newColumnName, column.getValue()) : column)
1261+
.collect(toImmutableMap(Map.Entry::getKey, Map.Entry::getValue));
1262+
try {
1263+
TransactionLogWriter transactionLogWriter = transactionLogWriterFactory.newWriter(session, table.getLocation());
1264+
appendTableEntries(
1265+
commitVersion,
1266+
transactionLogWriter,
1267+
table.getMetadataEntry().getId(),
1268+
columns,
1269+
partitionColumns,
1270+
columnComments,
1271+
columnsNullability,
1272+
columnMetadata,
1273+
table.getMetadataEntry().getConfiguration(),
1274+
RENAME_COLUMN_OPERATION,
1275+
session,
1276+
nodeVersion,
1277+
nodeId,
1278+
Optional.ofNullable(table.getMetadataEntry().getDescription()),
1279+
getProtocolEntry(session, table.getSchemaTableName()));
1280+
transactionLogWriter.flush();
1281+
}
1282+
catch (Exception e) {
1283+
throw new TrinoException(DELTA_LAKE_BAD_WRITE, format("Unable to rename '%s' column for: %s.%s", sourceColumnName, table.getSchemaName(), table.getTableName()), e);
1284+
}
1285+
}
1286+
12241287
private static void appendTableEntries(
12251288
long commitVersion,
12261289
TransactionLogWriter transactionLogWriter,

plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeMinioConnectorTest.java

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,6 @@ protected boolean hasBehavior(TestingConnectorBehavior connectorBehavior)
112112
return false;
113113

114114
case SUPPORTS_DROP_COLUMN:
115-
case SUPPORTS_RENAME_COLUMN:
116115
case SUPPORTS_SET_COLUMN_TYPE:
117116
return false;
118117

@@ -363,6 +362,33 @@ public void testDropNonEmptySchemaWithTable()
363362
assertUpdate("DROP SCHEMA " + schemaName);
364363
}
365364

365+
@Override
366+
public void testRenameColumn()
367+
{
368+
// Override because the connector doesn't support renaming columns with 'none' column mapping
369+
// There are some tests in in io.trino.tests.product.deltalake.TestDeltaLakeColumnMappingMode
370+
assertThatThrownBy(super::testRenameColumn)
371+
.hasMessageContaining("Cannot rename column with the column mapping: NONE");
372+
}
373+
374+
@Override
375+
public void testAlterTableRenameColumnToLongName()
376+
{
377+
// Override because the connector doesn't support renaming columns with 'none' column mapping
378+
// There are some tests in in io.trino.tests.product.deltalake.TestDeltaLakeColumnMappingMode
379+
assertThatThrownBy(super::testAlterTableRenameColumnToLongName)
380+
.hasMessageContaining("Cannot rename column with the column mapping: NONE");
381+
}
382+
383+
@Override
384+
public void testRenameColumnName(String columnName)
385+
{
386+
// Override because the connector doesn't support renaming columns with 'none' column mapping
387+
// There are some tests in in io.trino.tests.product.deltalake.TestDeltaLakeColumnMappingMode
388+
assertThatThrownBy(() -> super.testRenameColumnName(columnName))
389+
.hasMessageContaining("Cannot rename column with the column mapping: NONE");
390+
}
391+
366392
@Override
367393
public void testCharVarcharComparison()
368394
{

testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/suite/suites/SuiteDeltaLakeDatabricks104.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ public List<SuiteTestRun> getTestRuns(EnvironmentConfig config)
3232
return ImmutableList.of(
3333
testOnEnvironment(EnvSinglenodeDeltaLakeDatabricks104.class)
3434
.withGroups("configured_features", "delta-lake-databricks")
35+
.withExcludedGroups("delta-lake-exclude-104")
3536
.withExcludedTests(getExcludedTests())
3637
.build());
3738
}

testing/trino-product-tests/src/main/java/io/trino/tests/product/TestGroups.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ public final class TestGroups
8484
public static final String DELTA_LAKE_DATABRICKS = "delta-lake-databricks";
8585
public static final String DELTA_LAKE_EXCLUDE_73 = "delta-lake-exclude-73";
8686
public static final String DELTA_LAKE_EXCLUDE_91 = "delta-lake-exclude-91";
87+
public static final String DELTA_LAKE_EXCLUDE_104 = "delta-lake-exclude-104";
8788
public static final String DELTA_LAKE_EXCLUDE_113 = "delta-lake-exclude-113";
8889
public static final String HUDI = "hudi";
8990
public static final String PARQUET = "parquet";

testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeColumnMappingMode.java

Lines changed: 130 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import static io.trino.tempto.assertions.QueryAssert.assertThat;
2727
import static io.trino.testing.TestingNames.randomNameSuffix;
2828
import static io.trino.tests.product.TestGroups.DELTA_LAKE_DATABRICKS;
29+
import static io.trino.tests.product.TestGroups.DELTA_LAKE_EXCLUDE_104;
2930
import static io.trino.tests.product.TestGroups.DELTA_LAKE_EXCLUDE_73;
3031
import static io.trino.tests.product.TestGroups.DELTA_LAKE_EXCLUDE_91;
3132
import static io.trino.tests.product.TestGroups.DELTA_LAKE_OSS;
@@ -350,8 +351,6 @@ public void testUnsupportedOperationsColumnMappingModeName(String mode)
350351
.hasMessageContaining("Delta Lake writer version 5 which is not supported");
351352
assertQueryFailure(() -> onTrino().executeQuery("ALTER TABLE default." + tableName + " ADD COLUMN new_col varchar"))
352353
.hasMessageContaining("Delta Lake writer version 5 which is not supported");
353-
assertQueryFailure(() -> onTrino().executeQuery("ALTER TABLE default." + tableName + " RENAME COLUMN a_number TO renamed_column"))
354-
.hasMessageContaining("This connector does not support renaming columns");
355354
assertQueryFailure(() -> onTrino().executeQuery("ALTER TABLE default." + tableName + " DROP COLUMN a_number"))
356355
.hasMessageContaining("This connector does not support dropping columns");
357356
}
@@ -391,6 +390,135 @@ public void testSpecialCharacterColumnNamesWithColumnMappingMode(String mode)
391390
}
392391
}
393392

393+
@Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_OSS, DELTA_LAKE_EXCLUDE_73, DELTA_LAKE_EXCLUDE_91, DELTA_LAKE_EXCLUDE_104, PROFILE_SPECIFIC_TESTS}, dataProvider = "columnMappingDataProvider")
394+
@Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH)
395+
public void testTrinoRenameColumnWithColumnMappingMode(String mode)
396+
{
397+
String tableName = "test_rename_column_" + randomNameSuffix();
398+
399+
onDelta().executeQuery("" +
400+
"CREATE TABLE default." + tableName +
401+
" (id INT, data INT, part STRING)" +
402+
" USING delta " +
403+
" PARTITIONED BY (part) " +
404+
" LOCATION 's3://" + bucketName + "/databricks-compatibility-test-" + tableName + "'" +
405+
" TBLPROPERTIES ('delta.columnMapping.mode' = '" + mode + "')");
406+
407+
try {
408+
onDelta().executeQuery("INSERT INTO default." + tableName + " VALUES (1, 10, 'part#1')");
409+
410+
onTrino().executeQuery("ALTER TABLE delta.default." + tableName + " RENAME COLUMN data TO new_data");
411+
onTrino().executeQuery("ALTER TABLE delta.default." + tableName + " RENAME COLUMN part TO new_part");
412+
413+
assertThat(onTrino().executeQuery("DESCRIBE delta.default." + tableName))
414+
.containsOnly(
415+
row("id", "integer", "", ""),
416+
row("new_data", "integer", "", ""),
417+
row("new_part", "varchar", "", ""));
418+
419+
assertThat(onTrino().executeQuery("SELECT * FROM delta.default." + tableName))
420+
.containsOnly(row(1, 10, "part#1"));
421+
assertThat(onDelta().executeQuery("SELECT * FROM default." + tableName))
422+
.containsOnly(row(1, 10, "part#1"));
423+
424+
// Ensure renaming to the dropped column doesn't restore the old data
425+
// TODO: Drop a column in Trino once the connector supports the syntax
426+
assertQueryFailure(() -> onTrino().executeQuery("ALTER TABLE delta.default." + tableName + " DROP COLUMN id"))
427+
.hasMessageContaining("This connector does not support dropping columns");
428+
onDelta().executeQuery("ALTER TABLE default." + tableName + " DROP COLUMN id");
429+
onDelta().executeQuery("ALTER TABLE default." + tableName + " RENAME COLUMN new_data TO id");
430+
431+
assertThat(onTrino().executeQuery("SELECT id, new_part FROM delta.default." + tableName))
432+
.containsOnly(row(10, "part#1"));
433+
assertThat(onDelta().executeQuery("SELECT id, new_part FROM default." + tableName))
434+
.containsOnly(row(10, "part#1"));
435+
}
436+
finally {
437+
onDelta().executeQuery("DROP TABLE default." + tableName);
438+
}
439+
}
440+
441+
@Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_OSS, DELTA_LAKE_EXCLUDE_73, DELTA_LAKE_EXCLUDE_91, DELTA_LAKE_EXCLUDE_104, PROFILE_SPECIFIC_TESTS}, dataProvider = "columnMappingDataProvider")
442+
@Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH)
443+
public void testSparkRenameColumnWithColumnMappingMode(String mode)
444+
{
445+
String tableName = "test_spark_rename_column_" + randomNameSuffix();
446+
447+
onDelta().executeQuery("" +
448+
"CREATE TABLE default." + tableName +
449+
" (id INT, data INT, part STRING)" +
450+
" USING delta " +
451+
" PARTITIONED BY (part) " +
452+
" LOCATION 's3://" + bucketName + "/databricks-compatibility-test-" + tableName + "'" +
453+
" TBLPROPERTIES ('delta.columnMapping.mode' = '" + mode + "')");
454+
455+
try {
456+
onDelta().executeQuery("INSERT INTO default." + tableName + " VALUES (1, 10, 'part#1')");
457+
458+
onDelta().executeQuery("ALTER TABLE default." + tableName + " RENAME COLUMN data TO new_data");
459+
onDelta().executeQuery("ALTER TABLE default." + tableName + " RENAME COLUMN part TO new_part");
460+
461+
assertThat(onTrino().executeQuery("DESCRIBE delta.default." + tableName))
462+
.containsOnly(
463+
row("id", "integer", "", ""),
464+
row("new_data", "integer", "", ""),
465+
row("new_part", "varchar", "", ""));
466+
467+
assertThat(onTrino().executeQuery("SELECT * FROM delta.default." + tableName))
468+
.containsOnly(row(1, 10, "part#1"));
469+
assertThat(onDelta().executeQuery("SELECT * FROM default." + tableName))
470+
.containsOnly(row(1, 10, "part#1"));
471+
472+
// Ensure renaming to the dropped column doesn't restore the old data
473+
onDelta().executeQuery("ALTER TABLE default." + tableName + " DROP COLUMN id");
474+
onDelta().executeQuery("ALTER TABLE default." + tableName + " RENAME COLUMN new_data TO id");
475+
476+
assertThat(onTrino().executeQuery("SELECT id, new_part FROM delta.default." + tableName))
477+
.containsOnly(row(10, "part#1"));
478+
assertThat(onDelta().executeQuery("SELECT id, new_part FROM default." + tableName))
479+
.containsOnly(row(10, "part#1"));
480+
}
481+
finally {
482+
onDelta().executeQuery("DROP TABLE default." + tableName);
483+
}
484+
}
485+
486+
@Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_OSS, DELTA_LAKE_EXCLUDE_73, DELTA_LAKE_EXCLUDE_91, DELTA_LAKE_EXCLUDE_104, PROFILE_SPECIFIC_TESTS})
487+
@Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH)
488+
public void testUnsupportedRenameColumnWithColumnMappingModeNone()
489+
{
490+
String tableName = "test_unsupported_rename_column_" + randomNameSuffix();
491+
492+
onDelta().executeQuery("" +
493+
"CREATE TABLE default." + tableName +
494+
" (id INT, data INT)" +
495+
" USING delta " +
496+
" LOCATION 's3://" + bucketName + "/databricks-compatibility-test-" + tableName + "'" +
497+
" TBLPROPERTIES ('delta.columnMapping.mode' = 'none')");
498+
499+
try {
500+
onDelta().executeQuery("INSERT INTO default." + tableName + " VALUES (1, 10)");
501+
502+
assertQueryFailure(() -> onTrino().executeQuery("ALTER TABLE delta.default." + tableName + " RENAME COLUMN data TO new_data"))
503+
.hasMessageContaining("Cannot rename column with the column mapping: NONE");
504+
assertQueryFailure(() -> onDelta().executeQuery("ALTER TABLE default." + tableName + " RENAME COLUMN data TO new_data"))
505+
.hasMessageContaining(" Column rename is not supported for your Delta table");
506+
507+
assertThat(onTrino().executeQuery("DESCRIBE delta.default." + tableName))
508+
.containsOnly(
509+
row("id", "integer", "", ""),
510+
row("data", "integer", "", ""));
511+
512+
assertThat(onTrino().executeQuery("SELECT * FROM delta.default." + tableName))
513+
.containsOnly(row(1, 10));
514+
assertThat(onDelta().executeQuery("SELECT * FROM default." + tableName))
515+
.containsOnly(row(1, 10));
516+
}
517+
finally {
518+
onDelta().executeQuery("DROP TABLE default." + tableName);
519+
}
520+
}
521+
394522
@DataProvider
395523
public Object[][] columnMappingDataProvider()
396524
{

0 commit comments

Comments
 (0)