Skip to content

Commit 4732ce6

Browse files
committed
Add support for renaming column in Delta Lake
1 parent 98f3281 commit 4732ce6

File tree

3 files changed

+287
-0
lines changed

3 files changed

+287
-0
lines changed

plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -274,9 +274,11 @@ public class DeltaLakeMetadata
274274
LAZY_SIMPLE_SERDE_CLASS,
275275
SEQUENCEFILE_INPUT_FORMAT_CLASS,
276276
HIVE_SEQUENCEFILE_OUTPUT_FORMAT_CLASS);
277+
// Operation names in Delta Lake https://github.com/delta-io/delta/blob/master/core/src/main/scala/org/apache/spark/sql/delta/DeltaOperations.scala
277278
public static final String CREATE_TABLE_AS_OPERATION = "CREATE TABLE AS SELECT";
278279
public static final String CREATE_TABLE_OPERATION = "CREATE TABLE";
279280
public static final String ADD_COLUMN_OPERATION = "ADD COLUMNS";
281+
public static final String RENAME_COLUMN_OPERATION = "RENAME COLUMN";
280282
public static final String INSERT_OPERATION = "WRITE";
281283
public static final String MERGE_OPERATION = "MERGE";
282284
public static final String OPTIMIZE_OPERATION = "OPTIMIZE";
@@ -1209,6 +1211,82 @@ public void addColumn(ConnectorSession session, ConnectorTableHandle tableHandle
12091211
}
12101212
}
12111213

1214+
@Override
1215+
public void renameColumn(ConnectorSession session, ConnectorTableHandle tableHandle, ColumnHandle columnHandle, String newColumnName)
1216+
{
1217+
DeltaLakeTableHandle table = (DeltaLakeTableHandle) tableHandle;
1218+
DeltaLakeColumnHandle deltaLakeColumn = (DeltaLakeColumnHandle) columnHandle;
1219+
String sourceColumnName = deltaLakeColumn.getName();
1220+
1221+
if (changeDataFeedEnabled(table.getMetadataEntry()) && CHANGE_DATA_FEED_COLUMN_NAMES.contains(newColumnName)) {
1222+
throw new TrinoException(NOT_SUPPORTED, "Unable to rename to %s columns when change data feed is enabled: %s".formatted(CHANGE_DATA_FEED_COLUMN_NAMES, newColumnName));
1223+
}
1224+
1225+
ColumnMappingMode columnMappingMode = getColumnMappingMode(table.getMetadataEntry());
1226+
if (columnMappingMode != ColumnMappingMode.NAME && columnMappingMode != ColumnMappingMode.ID) {
1227+
throw new TrinoException(NOT_SUPPORTED, "Cannot rename column with the column mapping: " + columnMappingMode);
1228+
}
1229+
1230+
ConnectorTableMetadata tableMetadata = getTableMetadata(session, table);
1231+
long commitVersion = table.getReadVersion() + 1;
1232+
List<String> partitionColumns = getPartitionedBy(tableMetadata.getProperties()).stream()
1233+
.map(columnName -> columnName.equals(sourceColumnName) ? newColumnName : columnName)
1234+
.collect(toImmutableList());
1235+
1236+
List<DeltaLakeColumnHandle> columns = tableMetadata.getColumns().stream()
1237+
.filter(column -> !column.isHidden())
1238+
.map(column -> toColumnHandle(
1239+
column.getName().equals(sourceColumnName) ? ColumnMetadata.builderFrom(column).setName(newColumnName).build() : column,
1240+
column.getName().equals(sourceColumnName) ? newColumnName : column.getName(),
1241+
column.getType(),
1242+
partitionColumns))
1243+
.collect(toImmutableList());
1244+
Map<String, String> columnComments = getColumnComments(table.getMetadataEntry()).entrySet().stream()
1245+
.map(column -> column.getKey().equals(sourceColumnName) ? Map.entry(newColumnName, column.getValue()) : column)
1246+
.collect(toImmutableMap(Map.Entry::getKey, Map.Entry::getValue));
1247+
Map<String, Boolean> columnsNullability = getColumnsNullability(table.getMetadataEntry()).entrySet().stream()
1248+
.map(column -> column.getKey().equals(sourceColumnName) ? Map.entry(newColumnName, column.getValue()) : column)
1249+
.collect(toImmutableMap(Map.Entry::getKey, Map.Entry::getValue));
1250+
Map<String, Map<String, Object>> columnMetadata = getColumnsMetadata(table.getMetadataEntry()).entrySet().stream()
1251+
.map(column -> column.getKey().equals(sourceColumnName) ? Map.entry(newColumnName, column.getValue()) : column)
1252+
.collect(toImmutableMap(Map.Entry::getKey, Map.Entry::getValue));
1253+
try {
1254+
TransactionLogWriter transactionLogWriter = transactionLogWriterFactory.newWriter(session, table.getLocation());
1255+
appendTableEntries(
1256+
commitVersion,
1257+
transactionLogWriter,
1258+
table.getMetadataEntry().getId(),
1259+
columns,
1260+
partitionColumns,
1261+
columnComments,
1262+
columnsNullability,
1263+
columnMetadata,
1264+
table.getMetadataEntry().getConfiguration(),
1265+
RENAME_COLUMN_OPERATION,
1266+
session,
1267+
Optional.ofNullable(table.getMetadataEntry().getDescription()),
1268+
getProtocolEntry(session, table.getSchemaTableName()));
1269+
transactionLogWriter.flush();
1270+
1271+
statisticsAccess.readExtendedStatistics(session, table.getLocation()).ifPresent(existingStatistics -> {
1272+
ExtendedStatistics statistics = new ExtendedStatistics(
1273+
existingStatistics.getModelVersion(),
1274+
existingStatistics.getAlreadyAnalyzedModifiedTimeMax(),
1275+
existingStatistics.getColumnStatistics().entrySet().stream()
1276+
.map(stats -> stats.getKey().equals(sourceColumnName)
1277+
? Map.entry(newColumnName, DeltaLakeColumnStatistics.create(stats.getValue().getTotalSizeInBytes(), stats.getValue().getNdvSummary()))
1278+
: stats)
1279+
.collect(toImmutableMap(Entry::getKey, Entry::getValue)),
1280+
existingStatistics.getAnalyzedColumns()
1281+
.map(analyzedColumns -> analyzedColumns.stream().map(column -> column.equals(sourceColumnName) ? newColumnName : column).collect(toImmutableSet())));
1282+
statisticsAccess.updateExtendedStatistics(session, table.getLocation(), statistics);
1283+
});
1284+
}
1285+
catch (Exception e) {
1286+
throw new TrinoException(DELTA_LAKE_BAD_WRITE, format("Unable to rename '%s' column for: %s.%s", sourceColumnName, table.getSchemaName(), table.getTableName()), e);
1287+
}
1288+
}
1289+
12121290
private void appendTableEntries(
12131291
long commitVersion,
12141292
TransactionLogWriter transactionLogWriter,

plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeConnectorTest.java

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -334,6 +334,33 @@ public void testDropNonEmptySchemaWithTable()
334334
assertUpdate("DROP SCHEMA " + schemaName);
335335
}
336336

337+
@Override
338+
public void testRenameColumn()
339+
{
340+
// Override because the connector doesn't support renaming columns with 'none' column mapping
341+
// There are some tests in in io.trino.tests.product.deltalake.TestDeltaLakeColumnMappingMode
342+
assertThatThrownBy(super::testRenameColumn)
343+
.hasMessageContaining("Cannot rename column with the column mapping: NONE");
344+
}
345+
346+
@Override
347+
public void testAlterTableRenameColumnToLongName()
348+
{
349+
// Override because the connector doesn't support renaming columns with 'none' column mapping
350+
// There are some tests in in io.trino.tests.product.deltalake.TestDeltaLakeColumnMappingMode
351+
assertThatThrownBy(super::testAlterTableRenameColumnToLongName)
352+
.hasMessageContaining("Cannot rename column with the column mapping: NONE");
353+
}
354+
355+
@Override
356+
public void testRenameColumnName(String columnName)
357+
{
358+
// Override because the connector doesn't support renaming columns with 'none' column mapping
359+
// There are some tests in in io.trino.tests.product.deltalake.TestDeltaLakeColumnMappingMode
360+
assertThatThrownBy(() -> super.testRenameColumnName(columnName))
361+
.hasMessageContaining("Cannot rename column with the column mapping: NONE");
362+
}
363+
337364
@Override
338365
public void testCharVarcharComparison()
339366
{
@@ -879,6 +906,17 @@ public void testUnsupportedAddColumnWithChangeDataFeed(String columnName)
879906
}
880907
}
881908

909+
@Test(dataProvider = "changeDataFeedColumnNamesDataProvider")
910+
public void testUnsupportedRenameColumnWithChangeDataFeed(String columnName)
911+
{
912+
try (TestTable table = new TestTable(getQueryRunner()::execute, "test_rename_column", "(col int) WITH (change_data_feed_enabled = true)")) {
913+
assertQueryFails(
914+
"ALTER TABLE " + table.getName() + " RENAME COLUMN col TO " + columnName,
915+
"\\QUnable to rename to %s columns when change data feed is enabled: %s\\E".formatted(CHANGE_DATA_FEED_COLUMN_NAMES, columnName));
916+
assertTableColumnNames(table.getName(), "col");
917+
}
918+
}
919+
882920
@Test(dataProvider = "changeDataFeedColumnNamesDataProvider")
883921
public void testUnsupportedSetTablePropertyWithChangeDataFeed(String columnName)
884922
{

testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeColumnMappingMode.java

Lines changed: 171 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -894,6 +894,177 @@ public Object[][] columnMappingWithTrueAndFalseDataProvider()
894894
return cartesianProduct(supportedColumnMappingForDmlDataProvider(), trueFalse());
895895
}
896896

897+
@Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_OSS, DELTA_LAKE_EXCLUDE_73, DELTA_LAKE_EXCLUDE_91, DELTA_LAKE_EXCLUDE_104, PROFILE_SPECIFIC_TESTS}, dataProvider = "columnMappingDataProvider")
898+
@Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH)
899+
public void testTrinoRenameColumnWithColumnMappingMode(String mode)
900+
{
901+
String tableName = "test_rename_column_" + randomNameSuffix();
902+
903+
onDelta().executeQuery("" +
904+
"CREATE TABLE default." + tableName +
905+
" (id INT, data INT, part STRING)" +
906+
" USING delta " +
907+
" PARTITIONED BY (part) " +
908+
" LOCATION 's3://" + bucketName + "/databricks-compatibility-test-" + tableName + "'" +
909+
" TBLPROPERTIES ('delta.columnMapping.mode' = '" + mode + "')");
910+
911+
try {
912+
onDelta().executeQuery("INSERT INTO default." + tableName + " VALUES (1, 10, 'part#1')");
913+
914+
onTrino().executeQuery("ALTER TABLE delta.default." + tableName + " RENAME COLUMN data TO new_data");
915+
onTrino().executeQuery("ALTER TABLE delta.default." + tableName + " RENAME COLUMN part TO new_part");
916+
917+
assertThat(onTrino().executeQuery("DESCRIBE delta.default." + tableName))
918+
.containsOnly(
919+
row("id", "integer", "", ""),
920+
row("new_data", "integer", "", ""),
921+
row("new_part", "varchar", "", ""));
922+
923+
assertThat(onTrino().executeQuery("SELECT * FROM delta.default." + tableName))
924+
.containsOnly(row(1, 10, "part#1"));
925+
assertThat(onDelta().executeQuery("SELECT * FROM default." + tableName))
926+
.containsOnly(row(1, 10, "part#1"));
927+
928+
// Ensure renaming to the dropped column doesn't restore the old data
929+
// TODO: Drop a column in Trino once the connector supports the syntax
930+
assertQueryFailure(() -> onTrino().executeQuery("ALTER TABLE delta.default." + tableName + " DROP COLUMN id"))
931+
.hasMessageContaining("This connector does not support dropping columns");
932+
onDelta().executeQuery("ALTER TABLE default." + tableName + " DROP COLUMN id");
933+
onDelta().executeQuery("ALTER TABLE default." + tableName + " RENAME COLUMN new_data TO id");
934+
935+
assertThat(onTrino().executeQuery("SELECT id, new_part FROM delta.default." + tableName))
936+
.containsOnly(row(10, "part#1"));
937+
assertThat(onDelta().executeQuery("SELECT id, new_part FROM default." + tableName))
938+
.containsOnly(row(10, "part#1"));
939+
}
940+
finally {
941+
onDelta().executeQuery("DROP TABLE default." + tableName);
942+
}
943+
}
944+
945+
@Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_OSS, DELTA_LAKE_EXCLUDE_73, DELTA_LAKE_EXCLUDE_91, DELTA_LAKE_EXCLUDE_104, PROFILE_SPECIFIC_TESTS}, dataProvider = "columnMappingDataProvider")
946+
@Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH)
947+
public void testSparkRenameColumnWithColumnMappingMode(String mode)
948+
{
949+
String tableName = "test_spark_rename_column_" + randomNameSuffix();
950+
951+
onDelta().executeQuery("" +
952+
"CREATE TABLE default." + tableName +
953+
" (id INT, data INT, part STRING)" +
954+
" USING delta " +
955+
" PARTITIONED BY (part) " +
956+
" LOCATION 's3://" + bucketName + "/databricks-compatibility-test-" + tableName + "'" +
957+
" TBLPROPERTIES ('delta.columnMapping.mode' = '" + mode + "')");
958+
959+
try {
960+
onDelta().executeQuery("INSERT INTO default." + tableName + " VALUES (1, 10, 'part#1')");
961+
962+
onDelta().executeQuery("ALTER TABLE default." + tableName + " RENAME COLUMN data TO new_data");
963+
onDelta().executeQuery("ALTER TABLE default." + tableName + " RENAME COLUMN part TO new_part");
964+
965+
assertThat(onTrino().executeQuery("DESCRIBE delta.default." + tableName))
966+
.containsOnly(
967+
row("id", "integer", "", ""),
968+
row("new_data", "integer", "", ""),
969+
row("new_part", "varchar", "", ""));
970+
971+
assertThat(onTrino().executeQuery("SELECT * FROM delta.default." + tableName))
972+
.containsOnly(row(1, 10, "part#1"));
973+
assertThat(onDelta().executeQuery("SELECT * FROM default." + tableName))
974+
.containsOnly(row(1, 10, "part#1"));
975+
976+
// Ensure renaming to the dropped column doesn't restore the old data
977+
onDelta().executeQuery("ALTER TABLE default." + tableName + " DROP COLUMN id");
978+
onDelta().executeQuery("ALTER TABLE default." + tableName + " RENAME COLUMN new_data TO id");
979+
980+
assertThat(onTrino().executeQuery("SELECT id, new_part FROM delta.default." + tableName))
981+
.containsOnly(row(10, "part#1"));
982+
assertThat(onDelta().executeQuery("SELECT id, new_part FROM default." + tableName))
983+
.containsOnly(row(10, "part#1"));
984+
}
985+
finally {
986+
onDelta().executeQuery("DROP TABLE default." + tableName);
987+
}
988+
}
989+
990+
@Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_OSS, DELTA_LAKE_EXCLUDE_73, DELTA_LAKE_EXCLUDE_91, DELTA_LAKE_EXCLUDE_104, PROFILE_SPECIFIC_TESTS}, dataProvider = "columnMappingDataProvider")
991+
@Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH)
992+
public void testTrinoExtendedStatisticsRenameColumnWithColumnMappingMode(String mode)
993+
{
994+
String tableName = "test_rename_column_" + randomNameSuffix();
995+
996+
onDelta().executeQuery("" +
997+
"CREATE TABLE default." + tableName +
998+
" (a INT, b INT)" +
999+
" USING delta " +
1000+
" LOCATION 's3://" + bucketName + "/databricks-compatibility-test-" + tableName + "'" +
1001+
" TBLPROPERTIES ('delta.columnMapping.mode' = '" + mode + "')");
1002+
1003+
try {
1004+
onDelta().executeQuery("INSERT INTO default." + tableName + " VALUES (1, 2)");
1005+
onTrino().executeQuery("ANALYZE delta.default." + tableName);
1006+
assertThat(onTrino().executeQuery("SHOW STATS FOR delta.default." + tableName))
1007+
.containsOnly(
1008+
row("a", null, 1.0, 0.0, null, "1", "1"),
1009+
row("b", null, 1.0, 0.0, null, "2", "2"),
1010+
row(null, null, null, null, 1.0, null, null));
1011+
1012+
onTrino().executeQuery("ALTER TABLE delta.default." + tableName + " RENAME COLUMN b TO new_b");
1013+
assertThat(onTrino().executeQuery("SHOW STATS FOR delta.default." + tableName))
1014+
.containsOnly(
1015+
row("a", null, 1.0, 0.0, null, "1", "1"),
1016+
row("new_b", null, 1.0, 0.0, null, "2", "2"),
1017+
row(null, null, null, null, 1.0, null, null));
1018+
1019+
// Re-analyzing should work
1020+
onTrino().executeQuery("ANALYZE delta.default." + tableName);
1021+
assertThat(onTrino().executeQuery("SHOW STATS FOR delta.default." + tableName))
1022+
.containsOnly(
1023+
row("a", null, 1.0, 0.0, null, "1", "1"),
1024+
row("new_b", null, 1.0, 0.0, null, "2", "2"),
1025+
row(null, null, null, null, 1.0, null, null));
1026+
}
1027+
finally {
1028+
onDelta().executeQuery("DROP TABLE default." + tableName);
1029+
}
1030+
}
1031+
1032+
@Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_OSS, DELTA_LAKE_EXCLUDE_73, DELTA_LAKE_EXCLUDE_91, DELTA_LAKE_EXCLUDE_104, PROFILE_SPECIFIC_TESTS})
1033+
@Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH)
1034+
public void testUnsupportedRenameColumnWithColumnMappingModeNone()
1035+
{
1036+
String tableName = "test_unsupported_rename_column_" + randomNameSuffix();
1037+
1038+
onDelta().executeQuery("" +
1039+
"CREATE TABLE default." + tableName +
1040+
" (id INT, data INT)" +
1041+
" USING delta " +
1042+
" LOCATION 's3://" + bucketName + "/databricks-compatibility-test-" + tableName + "'" +
1043+
" TBLPROPERTIES ('delta.columnMapping.mode' = 'none')");
1044+
1045+
try {
1046+
onDelta().executeQuery("INSERT INTO default." + tableName + " VALUES (1, 10)");
1047+
1048+
assertQueryFailure(() -> onTrino().executeQuery("ALTER TABLE delta.default." + tableName + " RENAME COLUMN data TO new_data"))
1049+
.hasMessageContaining("Cannot rename column with the column mapping: NONE");
1050+
assertQueryFailure(() -> onDelta().executeQuery("ALTER TABLE default." + tableName + " RENAME COLUMN data TO new_data"))
1051+
.hasMessageContaining(" Column rename is not supported for your Delta table");
1052+
1053+
assertThat(onTrino().executeQuery("DESCRIBE delta.default." + tableName))
1054+
.containsOnly(
1055+
row("id", "integer", "", ""),
1056+
row("data", "integer", "", ""));
1057+
1058+
assertThat(onTrino().executeQuery("SELECT * FROM delta.default." + tableName))
1059+
.containsOnly(row(1, 10));
1060+
assertThat(onDelta().executeQuery("SELECT * FROM default." + tableName))
1061+
.containsOnly(row(1, 10));
1062+
}
1063+
finally {
1064+
onDelta().executeQuery("DROP TABLE default." + tableName);
1065+
}
1066+
}
1067+
8971068
@DataProvider
8981069
public Object[][] columnMappingDataProvider()
8991070
{

0 commit comments

Comments
 (0)