|
43 | 43 | import io.trino.plugin.deltalake.transactionlog.AddFileEntry;
|
44 | 44 | import io.trino.plugin.deltalake.transactionlog.CdfFileEntry;
|
45 | 45 | import io.trino.plugin.deltalake.transactionlog.CommitInfoEntry;
|
| 46 | +import io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.ColumnMappingMode; |
46 | 47 | import io.trino.plugin.deltalake.transactionlog.DeltaLakeTransactionLogEntry;
|
47 | 48 | import io.trino.plugin.deltalake.transactionlog.MetadataEntry;
|
48 | 49 | import io.trino.plugin.deltalake.transactionlog.MetadataEntry.Format;
|
@@ -266,9 +267,11 @@ public class DeltaLakeMetadata
|
266 | 267 | LAZY_SIMPLE_SERDE_CLASS,
|
267 | 268 | SEQUENCEFILE_INPUT_FORMAT_CLASS,
|
268 | 269 | HIVE_SEQUENCEFILE_OUTPUT_FORMAT_CLASS);
|
| 270 | + // Operation names in Delta Lake https://github.com/delta-io/delta/blob/master/core/src/main/scala/org/apache/spark/sql/delta/DeltaOperations.scala |
269 | 271 | public static final String CREATE_TABLE_AS_OPERATION = "CREATE TABLE AS SELECT";
|
270 | 272 | public static final String CREATE_TABLE_OPERATION = "CREATE TABLE";
|
271 | 273 | public static final String ADD_COLUMN_OPERATION = "ADD COLUMNS";
|
| 274 | + public static final String RENAME_COLUMN_OPERATION = "RENAME COLUMN"; |
272 | 275 | public static final String INSERT_OPERATION = "WRITE";
|
273 | 276 | public static final String MERGE_OPERATION = "MERGE";
|
274 | 277 | public static final String OPTIMIZE_OPERATION = "OPTIMIZE";
|
@@ -1145,6 +1148,78 @@ public void addColumn(ConnectorSession session, ConnectorTableHandle tableHandle
|
1145 | 1148 | }
|
1146 | 1149 | }
|
1147 | 1150 |
|
| 1151 | + @Override |
| 1152 | + public void renameColumn(ConnectorSession session, ConnectorTableHandle tableHandle, ColumnHandle columnHandle, String newColumnName) |
| 1153 | + { |
| 1154 | + DeltaLakeTableHandle table = (DeltaLakeTableHandle) tableHandle; |
| 1155 | + DeltaLakeColumnHandle deltaLakeColumn = (DeltaLakeColumnHandle) columnHandle; |
| 1156 | + String sourceColumnName = deltaLakeColumn.getName(); |
| 1157 | + |
| 1158 | + ColumnMappingMode columnMappingMode = getColumnMappingMode(table.getMetadataEntry()); |
| 1159 | + if (columnMappingMode != ColumnMappingMode.NAME && columnMappingMode != ColumnMappingMode.ID) { |
| 1160 | + throw new TrinoException(NOT_SUPPORTED, "Cannot rename column with the column mapping: " + columnMappingMode); |
| 1161 | + } |
| 1162 | + |
| 1163 | + ConnectorTableMetadata tableMetadata = getTableMetadata(session, table); |
| 1164 | + long commitVersion = table.getReadVersion() + 1; |
| 1165 | + List<String> partitionColumns = getPartitionedBy(tableMetadata.getProperties()).stream() |
| 1166 | + .map(columnName -> columnName.equals(sourceColumnName) ? newColumnName : columnName) |
| 1167 | + .collect(toImmutableList()); |
| 1168 | + |
| 1169 | + List<DeltaLakeColumnHandle> columns = tableMetadata.getColumns().stream() |
| 1170 | + .filter(column -> !column.isHidden()) |
| 1171 | + .map(column -> toColumnHandle( |
| 1172 | + column.getName().equals(sourceColumnName) ? ColumnMetadata.builderFrom(column).setName(newColumnName).build() : column, |
| 1173 | + column.getName().equals(sourceColumnName) ? newColumnName : column.getName(), |
| 1174 | + column.getType(), |
| 1175 | + partitionColumns)) |
| 1176 | + .collect(toImmutableList()); |
| 1177 | + Map<String, String> columnComments = getColumnComments(table.getMetadataEntry()).entrySet().stream() |
| 1178 | + .map(column -> column.getKey().equals(sourceColumnName) ? Map.entry(newColumnName, column.getValue()) : column) |
| 1179 | + .collect(toImmutableMap(Map.Entry::getKey, Map.Entry::getValue)); |
| 1180 | + Map<String, Boolean> columnsNullability = getColumnsNullability(table.getMetadataEntry()).entrySet().stream() |
| 1181 | + .map(column -> column.getKey().equals(sourceColumnName) ? Map.entry(newColumnName, column.getValue()) : column) |
| 1182 | + .collect(toImmutableMap(Map.Entry::getKey, Map.Entry::getValue)); |
| 1183 | + Map<String, Map<String, Object>> columnMetadata = getColumnsMetadata(table.getMetadataEntry()).entrySet().stream() |
| 1184 | + .map(column -> column.getKey().equals(sourceColumnName) ? Map.entry(newColumnName, column.getValue()) : column) |
| 1185 | + .collect(toImmutableMap(Map.Entry::getKey, Map.Entry::getValue)); |
| 1186 | + try { |
| 1187 | + TransactionLogWriter transactionLogWriter = transactionLogWriterFactory.newWriter(session, table.getLocation()); |
| 1188 | + appendTableEntries( |
| 1189 | + commitVersion, |
| 1190 | + transactionLogWriter, |
| 1191 | + table.getMetadataEntry().getId(), |
| 1192 | + columns, |
| 1193 | + partitionColumns, |
| 1194 | + columnComments, |
| 1195 | + columnsNullability, |
| 1196 | + columnMetadata, |
| 1197 | + table.getMetadataEntry().getConfiguration(), |
| 1198 | + RENAME_COLUMN_OPERATION, |
| 1199 | + session, |
| 1200 | + Optional.ofNullable(table.getMetadataEntry().getDescription()), |
| 1201 | + getProtocolEntry(session, table.getSchemaTableName())); |
| 1202 | + transactionLogWriter.flush(); |
| 1203 | + |
| 1204 | + statisticsAccess.readExtendedStatistics(session, table.getLocation()).ifPresent(existingStatistics -> { |
| 1205 | + ExtendedStatistics statistics = new ExtendedStatistics( |
| 1206 | + existingStatistics.getModelVersion(), |
| 1207 | + existingStatistics.getAlreadyAnalyzedModifiedTimeMax(), |
| 1208 | + existingStatistics.getColumnStatistics().entrySet().stream() |
| 1209 | + .map(stats -> stats.getKey().equals(sourceColumnName) |
| 1210 | + ? Map.entry(newColumnName, DeltaLakeColumnStatistics.create(stats.getValue().getTotalSizeInBytes(), stats.getValue().getNdvSummary())) |
| 1211 | + : stats) |
| 1212 | + .collect(toImmutableMap(Entry::getKey, Entry::getValue)), |
| 1213 | + existingStatistics.getAnalyzedColumns() |
| 1214 | + .map(analyzedColumns -> analyzedColumns.stream().map(column -> column.equals(sourceColumnName) ? newColumnName : column).collect(toImmutableSet()))); |
| 1215 | + statisticsAccess.updateExtendedStatistics(session, table.getLocation(), statistics); |
| 1216 | + }); |
| 1217 | + } |
| 1218 | + catch (Exception e) { |
| 1219 | + throw new TrinoException(DELTA_LAKE_BAD_WRITE, format("Unable to rename '%s' column for: %s.%s", sourceColumnName, table.getSchemaName(), table.getTableName()), e); |
| 1220 | + } |
| 1221 | + } |
| 1222 | + |
1148 | 1223 | private void appendTableEntries(
|
1149 | 1224 | long commitVersion,
|
1150 | 1225 | TransactionLogWriter transactionLogWriter,
|
|
0 commit comments