Skip to content

Commit 07c151b

Browse files
committed
Support OPTIMIZE procedure in Delta Lake column mapping table
1 parent 5295f3d commit 07c151b

File tree

3 files changed

+134
-9
lines changed

3 files changed

+134
-9
lines changed

plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1800,11 +1800,6 @@ private BeginTableExecuteResult<ConnectorTableExecuteHandle, ConnectorTableHandl
18001800

18011801
checkWriteAllowed(session, table);
18021802
checkSupportedWriterVersion(session, table);
1803-
ColumnMappingMode columnMappingMode = getColumnMappingMode(table.getMetadataEntry());
1804-
if (columnMappingMode != NONE) {
1805-
// TODO https://github.com/trinodb/trino/issues/12638 Support 'optimize' table procedure for id and name column mapping mode
1806-
throw new TrinoException(NOT_SUPPORTED, "Executing 'optimize' procedure with column mapping %s is not supported".formatted(columnMappingMode.name().toLowerCase(ENGLISH)));
1807-
}
18081803

18091804
return new BeginTableExecuteResult<>(
18101805
executeHandle.withProcedureHandle(optimizeHandle.withCurrentVersion(table.getReadVersion())),
@@ -1862,7 +1857,10 @@ private void finishOptimize(ConnectorSession session, DeltaLakeTableExecuteHandl
18621857
}
18631858

18641859
// Note: during writes we want to preserve original case of partition columns
1865-
List<String> partitionColumns = optimizeHandle.getMetadataEntry().getOriginalPartitionColumns();
1860+
List<String> partitionColumns = getPartitionColumns(
1861+
optimizeHandle.getMetadataEntry().getOriginalPartitionColumns(),
1862+
optimizeHandle.getTableColumns(),
1863+
getColumnMappingMode(optimizeHandle.getMetadataEntry()));
18661864
appendAddFileEntries(transactionLogWriter, dataFileInfos, partitionColumns, false);
18671865

18681866
transactionLogWriter.flush();

plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeBasic.java

Lines changed: 77 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,22 @@
2121
import com.google.common.io.Resources;
2222
import io.airlift.json.ObjectMapperProvider;
2323
import io.trino.filesystem.TrinoFileSystem;
24+
import io.trino.filesystem.TrinoInputFile;
2425
import io.trino.filesystem.hdfs.HdfsFileSystemFactory;
26+
import io.trino.filesystem.local.LocalInputFile;
27+
import io.trino.parquet.ParquetReaderOptions;
28+
import io.trino.parquet.reader.MetadataReader;
29+
import io.trino.plugin.deltalake.transactionlog.AddFileEntry;
2530
import io.trino.plugin.deltalake.transactionlog.DeltaLakeTransactionLogEntry;
2631
import io.trino.plugin.deltalake.transactionlog.MetadataEntry;
32+
import io.trino.plugin.deltalake.transactionlog.statistics.DeltaLakeFileStatistics;
33+
import io.trino.plugin.hive.FileFormatDataSourceStats;
34+
import io.trino.plugin.hive.parquet.TrinoParquetDataSource;
2735
import io.trino.testing.AbstractTestQueryFramework;
2836
import io.trino.testing.QueryRunner;
37+
import org.apache.parquet.hadoop.metadata.FileMetaData;
38+
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
39+
import org.apache.parquet.schema.PrimitiveType;
2940
import org.assertj.core.api.Assertions;
3041
import org.testng.annotations.BeforeClass;
3142
import org.testng.annotations.DataProvider;
@@ -38,6 +49,7 @@
3849
import java.nio.file.Files;
3950
import java.nio.file.Path;
4051
import java.util.List;
52+
import java.util.Optional;
4153
import java.util.regex.Pattern;
4254
import java.util.stream.Stream;
4355

@@ -69,11 +81,15 @@ public class TestDeltaLakeBasic
6981
// The col-{uuid} pattern for delta.columnMapping.physicalName
7082
private static final Pattern PHYSICAL_COLUMN_NAME_PATTERN = Pattern.compile("^col-[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}$");
7183

84+
private static final TrinoFileSystem FILE_SYSTEM = new HdfsFileSystemFactory(HDFS_ENVIRONMENT, HDFS_FILE_SYSTEM_STATS).create(SESSION);
85+
7286
@Override
7387
protected QueryRunner createQueryRunner()
7488
throws Exception
7589
{
76-
return createDeltaLakeQueryRunner(DELTA_CATALOG, ImmutableMap.of(), ImmutableMap.of("delta.register-table-procedure.enabled", "true"));
90+
return createDeltaLakeQueryRunner(DELTA_CATALOG, ImmutableMap.of(), ImmutableMap.of(
91+
"delta.register-table-procedure.enabled", "true",
92+
"delta.enable-non-concurrent-writes", "true"));
7793
}
7894

7995
@BeforeClass
@@ -208,6 +224,66 @@ public void testAddNestedColumnWithColumnMappingMode(String columnMappingMode)
208224
.containsPattern("(delta\\.columnMapping\\.physicalName.*?){11}");
209225
}
210226

227+
/**
228+
* @see deltalake.column_mapping_mode_id
229+
* @see deltalake.column_mapping_mode_name
230+
*/
231+
@Test(dataProvider = "columnMappingModeDataProvider")
232+
public void testOptimizeWithColumnMappingMode(String columnMappingMode)
233+
throws Exception
234+
{
235+
// The table contains 'x' column with column mapping mode
236+
String tableName = "test_optimize_" + randomNameSuffix();
237+
Path tableLocation = Files.createTempFile(tableName, null);
238+
copyDirectoryContents(new File(Resources.getResource("deltalake/column_mapping_mode_" + columnMappingMode).toURI()).toPath(), tableLocation);
239+
240+
assertUpdate("CALL system.register_table('%s', '%s', '%s')".formatted(getSession().getSchema().orElseThrow(), tableName, tableLocation.toUri()));
241+
assertThat(query("DESCRIBE " + tableName)).projected("Column", "Type").skippingTypesCheck().matches("VALUES ('x', 'integer')");
242+
assertQueryReturnsEmptyResult("SELECT * FROM " + tableName);
243+
244+
MetadataEntry originalMetadata = loadMetadataEntry(0, tableLocation);
245+
JsonNode schema = OBJECT_MAPPER.readTree(originalMetadata.getSchemaString());
246+
List<JsonNode> fields = ImmutableList.copyOf(schema.get("fields").elements());
247+
Assertions.assertThat(fields).hasSize(1);
248+
JsonNode column = fields.get(0);
249+
String physicalName = column.get("metadata").get("delta.columnMapping.physicalName").asText();
250+
int id = column.get("metadata").get("delta.columnMapping.id").asInt();
251+
252+
assertUpdate("INSERT INTO " + tableName + " VALUES 10", 1);
253+
assertUpdate("INSERT INTO " + tableName + " VALUES 20", 1);
254+
assertUpdate("INSERT INTO " + tableName + " VALUES NULL", 1);
255+
assertUpdate("ALTER TABLE " + tableName + " EXECUTE OPTIMIZE");
256+
257+
// Verify 'add' entry contains the expected physical name in the stats
258+
List<DeltaLakeTransactionLogEntry> transactionLog = getEntriesFromJson(4, tableLocation.resolve("_delta_log").toString(), FILE_SYSTEM).orElseThrow();
259+
assertThat(transactionLog).hasSize(5);
260+
assertThat(transactionLog.get(0).getCommitInfo()).isNotNull();
261+
assertThat(transactionLog.get(1).getRemove()).isNotNull();
262+
assertThat(transactionLog.get(2).getRemove()).isNotNull();
263+
assertThat(transactionLog.get(3).getRemove()).isNotNull();
264+
assertThat(transactionLog.get(4).getAdd()).isNotNull();
265+
AddFileEntry addFileEntry = transactionLog.get(4).getAdd();
266+
DeltaLakeFileStatistics stats = addFileEntry.getStats().orElseThrow();
267+
assertThat(stats.getMinValues().orElseThrow().get(physicalName)).isEqualTo(10);
268+
assertThat(stats.getMaxValues().orElseThrow().get(physicalName)).isEqualTo(20);
269+
assertThat(stats.getNullCount(physicalName).orElseThrow()).isEqualTo(1);
270+
271+
// Verify optimized parquet file contains the expected physical id and name
272+
TrinoInputFile inputFile = new LocalInputFile(tableLocation.resolve(addFileEntry.getPath()).toFile());
273+
ParquetMetadata parquetMetadata = MetadataReader.readFooter(
274+
new TrinoParquetDataSource(inputFile, new ParquetReaderOptions(), new FileFormatDataSourceStats()),
275+
Optional.empty());
276+
FileMetaData fileMetaData = parquetMetadata.getFileMetaData();
277+
PrimitiveType physicalType = getOnlyElement(fileMetaData.getSchema().getColumns().iterator()).getPrimitiveType();
278+
assertThat(physicalType.getName()).isEqualTo(physicalName);
279+
if (columnMappingMode.equals("id")) {
280+
assertThat(physicalType.getId().intValue()).isEqualTo(id);
281+
}
282+
else {
283+
assertThat(physicalType.getId()).isNull();
284+
}
285+
}
286+
211287
@DataProvider
212288
public Object[][] columnMappingModeDataProvider()
213289
{

testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeColumnMappingMode.java

Lines changed: 53 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -817,8 +817,6 @@ public void testUnsupportedOperationsColumnMappingMode(String mode)
817817
" 'delta.minWriterVersion'='5')");
818818

819819
try {
820-
assertQueryFailure(() -> onTrino().executeQuery("ALTER TABLE delta.default." + tableName + " EXECUTE OPTIMIZE"))
821-
.hasMessageContaining("Executing 'optimize' procedure with column mapping %s is not supported".formatted(mode));
822820
assertQueryFailure(() -> onTrino().executeQuery("ALTER TABLE delta.default." + tableName + " RENAME COLUMN a_number TO renamed_column"))
823821
.hasMessageContaining("This connector does not support renaming columns");
824822
assertQueryFailure(() -> onTrino().executeQuery("ALTER TABLE delta.default." + tableName + " DROP COLUMN a_number"))
@@ -829,6 +827,59 @@ public void testUnsupportedOperationsColumnMappingMode(String mode)
829827
}
830828
}
831829

830+
@Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_OSS, DELTA_LAKE_EXCLUDE_73, DELTA_LAKE_EXCLUDE_91, PROFILE_SPECIFIC_TESTS}, dataProvider = "columnMappingWithTrueAndFalseDataProvider")
831+
@Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH)
832+
public void testOptimizeProcedureColumnMappingMode(String mode, boolean partitioned)
833+
{
834+
String tableName = "test_dl_optimize_column_mapping_mode_" + randomNameSuffix();
835+
836+
onDelta().executeQuery("" +
837+
"CREATE TABLE default." + tableName +
838+
"(a_number INT, a_struct STRUCT<x: INT>, a_string STRING) " +
839+
"USING delta " +
840+
(partitioned ? "PARTITIONED BY (a_string)" : "") +
841+
"LOCATION 's3://" + bucketName + "/databricks-compatibility-test-" + tableName + "'" +
842+
"TBLPROPERTIES ('delta.columnMapping.mode'='" + mode + "')");
843+
844+
try {
845+
onTrino().executeQuery("INSERT INTO delta.default." + tableName + " VALUES (1, row(11), 'a')");
846+
onTrino().executeQuery("INSERT INTO delta.default." + tableName + " VALUES (2, row(22), 'b')");
847+
onTrino().executeQuery("INSERT INTO delta.default." + tableName + " VALUES (3, row(33), 'c')");
848+
849+
Double stringColumnSize = partitioned ? null : 3.0;
850+
List<Row> expectedStats = ImmutableList.<Row>builder()
851+
.add(row("a_number", null, 3.0, 0.0, null, "1", "3"))
852+
.add(row("a_struct", null, null, null, null, null, null))
853+
.add(row("a_string", stringColumnSize, 3.0, 0.0, null, null, null))
854+
.add(row(null, null, null, null, 3.0, null, null))
855+
.build();
856+
assertThat(onTrino().executeQuery("SHOW STATS FOR delta.default." + tableName))
857+
.containsOnly(expectedStats);
858+
859+
// Execute OPTIMIZE procedure and verify that the statistics is preserved and the table is still writable and readable
860+
onTrino().executeQuery("ALTER TABLE delta.default." + tableName + " EXECUTE OPTIMIZE");
861+
862+
assertThat(onTrino().executeQuery("SHOW STATS FOR delta.default." + tableName))
863+
.containsOnly(expectedStats);
864+
865+
onTrino().executeQuery("INSERT INTO delta.default." + tableName + " VALUES (4, row(44), 'd')");
866+
onDelta().executeQuery("INSERT INTO default." + tableName + " VALUES (5, named_struct('x',55), 'e')");
867+
868+
List<Row> expectedRows = ImmutableList.<Row>builder()
869+
.add(row(1, 11, "a"))
870+
.add(row(2, 22, "b"))
871+
.add(row(3, 33, "c"))
872+
.add(row(4, 44, "d"))
873+
.add(row(5, 55, "e"))
874+
.build();
875+
assertThat(onTrino().executeQuery("SELECT a_number, a_struct.x, a_string FROM delta.default." + tableName)).contains(expectedRows);
876+
assertThat(onDelta().executeQuery("SELECT a_number, a_struct.x, a_string FROM default." + tableName)).contains(expectedRows);
877+
}
878+
finally {
879+
dropDeltaTableWithRetry("default." + tableName);
880+
}
881+
}
882+
832883
@Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_OSS, DELTA_LAKE_EXCLUDE_73, DELTA_LAKE_EXCLUDE_91, PROFILE_SPECIFIC_TESTS}, dataProvider = "columnMappingDataProvider")
833884
@Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH)
834885
public void testSpecialCharacterColumnNamesWithColumnMappingMode(String mode)

0 commit comments

Comments
 (0)