|
43 | 43 | import io.trino.plugin.deltalake.transactionlog.AddFileEntry;
|
44 | 44 | import io.trino.plugin.deltalake.transactionlog.CdfFileEntry;
|
45 | 45 | import io.trino.plugin.deltalake.transactionlog.CommitInfoEntry;
|
| 46 | +import io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.ColumnMappingMode; |
46 | 47 | import io.trino.plugin.deltalake.transactionlog.DeltaLakeTransactionLogEntry;
|
47 | 48 | import io.trino.plugin.deltalake.transactionlog.MetadataEntry;
|
48 | 49 | import io.trino.plugin.deltalake.transactionlog.MetadataEntry.Format;
|
@@ -272,9 +273,11 @@ public class DeltaLakeMetadata
|
272 | 273 | LAZY_SIMPLE_SERDE_CLASS,
|
273 | 274 | SEQUENCEFILE_INPUT_FORMAT_CLASS,
|
274 | 275 | HIVE_SEQUENCEFILE_OUTPUT_FORMAT_CLASS);
|
| 276 | + // Operation names in Delta Lake https://github.com/delta-io/delta/blob/master/core/src/main/scala/org/apache/spark/sql/delta/DeltaOperations.scala |
275 | 277 | public static final String CREATE_TABLE_AS_OPERATION = "CREATE TABLE AS SELECT";
|
276 | 278 | public static final String CREATE_TABLE_OPERATION = "CREATE TABLE";
|
277 | 279 | public static final String ADD_COLUMN_OPERATION = "ADD COLUMNS";
|
| 280 | + public static final String RENAME_COLUMN_OPERATION = "RENAME COLUMN"; |
278 | 281 | public static final String INSERT_OPERATION = "WRITE";
|
279 | 282 | public static final String MERGE_OPERATION = "MERGE";
|
280 | 283 | public static final String OPTIMIZE_OPERATION = "OPTIMIZE";
|
@@ -1225,6 +1228,80 @@ public void addColumn(ConnectorSession session, ConnectorTableHandle tableHandle
|
1225 | 1228 | }
|
1226 | 1229 | }
|
1227 | 1230 |
|
| 1231 | + @Override |
| 1232 | + public void renameColumn(ConnectorSession session, ConnectorTableHandle tableHandle, ColumnHandle columnHandle, String newColumnName) |
| 1233 | + { |
| 1234 | + DeltaLakeTableHandle table = (DeltaLakeTableHandle) tableHandle; |
| 1235 | + DeltaLakeColumnHandle deltaLakeColumn = (DeltaLakeColumnHandle) columnHandle; |
| 1236 | + String sourceColumnName = deltaLakeColumn.getName(); |
| 1237 | + |
| 1238 | + ColumnMappingMode columnMappingMode = getColumnMappingMode(table.getMetadataEntry()); |
| 1239 | + if (columnMappingMode != ColumnMappingMode.NAME && columnMappingMode != ColumnMappingMode.ID) { |
| 1240 | + throw new TrinoException(NOT_SUPPORTED, "Cannot rename column with the column mapping: " + columnMappingMode); |
| 1241 | + } |
| 1242 | + |
| 1243 | + ConnectorTableMetadata tableMetadata = getTableMetadata(session, table); |
| 1244 | + long commitVersion = table.getReadVersion() + 1; |
| 1245 | + List<String> partitionColumns = getPartitionedBy(tableMetadata.getProperties()).stream() |
| 1246 | + .map(columnName -> columnName.equals(sourceColumnName) ? newColumnName : columnName) |
| 1247 | + .collect(toImmutableList()); |
| 1248 | + |
| 1249 | + List<DeltaLakeColumnHandle> columns = tableMetadata.getColumns().stream() |
| 1250 | + .filter(column -> !column.isHidden()) |
| 1251 | + .map(column -> toColumnHandle( |
| 1252 | + column.getName().equals(sourceColumnName) ? ColumnMetadata.builderFrom(column).setName(newColumnName).build() : column, |
| 1253 | + column.getName().equals(sourceColumnName) ? newColumnName : column.getName(), |
| 1254 | + column.getType(), |
| 1255 | + partitionColumns)) |
| 1256 | + .collect(toImmutableList()); |
| 1257 | + Map<String, String> columnComments = getColumnComments(table.getMetadataEntry()).entrySet().stream() |
| 1258 | + .map(column -> column.getKey().equals(sourceColumnName) ? Map.entry(newColumnName, column.getValue()) : column) |
| 1259 | + .collect(toImmutableMap(Map.Entry::getKey, Map.Entry::getValue)); |
| 1260 | + Map<String, Boolean> columnsNullability = getColumnsNullability(table.getMetadataEntry()).entrySet().stream() |
| 1261 | + .map(column -> column.getKey().equals(sourceColumnName) ? Map.entry(newColumnName, column.getValue()) : column) |
| 1262 | + .collect(toImmutableMap(Map.Entry::getKey, Map.Entry::getValue)); |
| 1263 | + Map<String, Map<String, Object>> columnMetadata = getColumnsMetadata(table.getMetadataEntry()).entrySet().stream() |
| 1264 | + .map(column -> column.getKey().equals(sourceColumnName) ? Map.entry(newColumnName, column.getValue()) : column) |
| 1265 | + .collect(toImmutableMap(Map.Entry::getKey, Map.Entry::getValue)); |
| 1266 | + try { |
| 1267 | + TransactionLogWriter transactionLogWriter = transactionLogWriterFactory.newWriter(session, table.getLocation()); |
| 1268 | + appendTableEntries( |
| 1269 | + commitVersion, |
| 1270 | + transactionLogWriter, |
| 1271 | + table.getMetadataEntry().getId(), |
| 1272 | + columns, |
| 1273 | + partitionColumns, |
| 1274 | + columnComments, |
| 1275 | + columnsNullability, |
| 1276 | + columnMetadata, |
| 1277 | + table.getMetadataEntry().getConfiguration(), |
| 1278 | + RENAME_COLUMN_OPERATION, |
| 1279 | + session, |
| 1280 | + nodeVersion, |
| 1281 | + nodeId, |
| 1282 | + Optional.ofNullable(table.getMetadataEntry().getDescription()), |
| 1283 | + getProtocolEntry(session, table.getSchemaTableName())); |
| 1284 | + transactionLogWriter.flush(); |
| 1285 | + |
| 1286 | + statisticsAccess.readExtendedStatistics(session, table.getLocation()).ifPresent(existingStatistics -> { |
| 1287 | + ExtendedStatistics statistics = new ExtendedStatistics( |
| 1288 | + existingStatistics.getModelVersion(), |
| 1289 | + existingStatistics.getAlreadyAnalyzedModifiedTimeMax(), |
| 1290 | + existingStatistics.getColumnStatistics().entrySet().stream() |
| 1291 | + .map(stats -> stats.getKey().equals(sourceColumnName) |
| 1292 | + ? Map.entry(newColumnName, DeltaLakeColumnStatistics.create(stats.getValue().getTotalSizeInBytes(), stats.getValue().getNdvSummary())) |
| 1293 | + : stats) |
| 1294 | + .collect(toImmutableMap(Entry::getKey, Entry::getValue)), |
| 1295 | + existingStatistics.getAnalyzedColumns() |
| 1296 | + .map(analyzedColumns -> analyzedColumns.stream().map(column -> column.equals(sourceColumnName) ? newColumnName : column).collect(toImmutableSet()))); |
| 1297 | + statisticsAccess.updateExtendedStatistics(session, table.getLocation(), statistics); |
| 1298 | + }); |
| 1299 | + } |
| 1300 | + catch (Exception e) { |
| 1301 | + throw new TrinoException(DELTA_LAKE_BAD_WRITE, format("Unable to rename '%s' column for: %s.%s", sourceColumnName, table.getSchemaName(), table.getTableName()), e); |
| 1302 | + } |
| 1303 | + } |
| 1304 | + |
1228 | 1305 | private static void appendTableEntries(
|
1229 | 1306 | long commitVersion,
|
1230 | 1307 | TransactionLogWriter transactionLogWriter,
|
|
0 commit comments