From e94c13a8baf8cbd707d8abffb2872617343c20db Mon Sep 17 00:00:00 2001 From: Michiel De Smet Date: Mon, 2 Dec 2024 16:20:14 +0800 Subject: [PATCH] Register VACUUM operations in the delta log --- .../plugin/deltalake/DeltaLakeConfig.java | 14 +++ .../deltalake/DeltaLakeSessionProperties.java | 11 ++ .../deltalake/procedure/VacuumProcedure.java | 119 +++++++++++++++--- .../BaseDeltaLakeConnectorSmokeTest.java | 32 +++++ .../plugin/deltalake/TestDeltaLakeConfig.java | 7 +- 5 files changed, 164 insertions(+), 19 deletions(-) diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeConfig.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeConfig.java index 2438414cb124..ada3f735bf75 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeConfig.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeConfig.java @@ -70,6 +70,7 @@ public class DeltaLakeConfig private long defaultCheckpointWritingInterval = 10; private boolean checkpointFilteringEnabled = true; private Duration vacuumMinRetention = new Duration(7, DAYS); + private boolean vacuumTransactionLoggingEnabled; private Optional hiveCatalogName = Optional.empty(); private Duration dynamicFilteringWaitTimeout = new Duration(0, SECONDS); private boolean tableStatisticsEnabled = true; @@ -280,6 +281,19 @@ public DeltaLakeConfig setVacuumMinRetention(Duration vacuumMinRetention) return this; } + public boolean isVacuumTransactionLoggingEnabled() + { + return vacuumTransactionLoggingEnabled; + } + + @Config("delta.vacuum.transaction-logging.enabled") + @ConfigDescription("Whether to log vacuum information into the Delta transaction log") + public DeltaLakeConfig setVacuumTransactionLoggingEnabled(boolean vacuumTransactionLoggingEnabled) + { + this.vacuumTransactionLoggingEnabled = vacuumTransactionLoggingEnabled; + return this; + } + public Optional getHiveCatalogName() { return hiveCatalogName; diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSessionProperties.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSessionProperties.java index 01e744dc3047..4bb0aaa89729 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSessionProperties.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSessionProperties.java @@ -52,6 +52,7 @@ public final class DeltaLakeSessionProperties { public static final String MAX_SPLIT_SIZE = "max_split_size"; public static final String VACUUM_MIN_RETENTION = "vacuum_min_retention"; + public static final String VACUUM_TRANSACTION_LOGGING_ENABLED = "vacuum_transaction_logging_enabled"; private static final String HIVE_CATALOG_NAME = "hive_catalog_name"; private static final String PARQUET_MAX_READ_BLOCK_SIZE = "parquet_max_read_block_size"; private static final String PARQUET_MAX_READ_BLOCK_ROW_COUNT = "parquet_max_read_block_row_count"; @@ -96,6 +97,11 @@ public DeltaLakeSessionProperties( "Minimal retention period for vacuum procedure", deltaLakeConfig.getVacuumMinRetention(), false), + booleanProperty( + VACUUM_TRANSACTION_LOGGING_ENABLED, + "Vacuum logging enabled", + deltaLakeConfig.isVacuumTransactionLoggingEnabled(), + false), stringProperty( HIVE_CATALOG_NAME, "Catalog to redirect to when a Hive table is referenced", @@ -255,6 +261,11 @@ public static Duration getVacuumMinRetention(ConnectorSession session) return session.getProperty(VACUUM_MIN_RETENTION, Duration.class); } + public static boolean isVacuumLoggingEnabled(ConnectorSession session) + { + return session.getProperty(VACUUM_TRANSACTION_LOGGING_ENABLED, Boolean.class); + } + public static Optional getHiveCatalogName(ConnectorSession session) { return Optional.ofNullable(session.getProperty(HIVE_CATALOG_NAME, String.class)); diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/procedure/VacuumProcedure.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/procedure/VacuumProcedure.java index 9ee792497097..0ec86eaf19a1 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/procedure/VacuumProcedure.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/procedure/VacuumProcedure.java @@ -14,6 +14,7 @@ package io.trino.plugin.deltalake.procedure; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.inject.Inject; import com.google.inject.Provider; @@ -32,11 +33,15 @@ import io.trino.plugin.deltalake.DeltaLakeSessionProperties; import io.trino.plugin.deltalake.DeltaLakeTableHandle; import io.trino.plugin.deltalake.transactionlog.AddFileEntry; +import io.trino.plugin.deltalake.transactionlog.CommitInfoEntry; import io.trino.plugin.deltalake.transactionlog.DeltaLakeTransactionLogEntry; import io.trino.plugin.deltalake.transactionlog.ProtocolEntry; import io.trino.plugin.deltalake.transactionlog.RemoveFileEntry; import io.trino.plugin.deltalake.transactionlog.TableSnapshot; import io.trino.plugin.deltalake.transactionlog.TransactionLogAccess; +import io.trino.plugin.deltalake.transactionlog.writer.TransactionLogWriter; +import io.trino.plugin.deltalake.transactionlog.writer.TransactionLogWriterFactory; +import io.trino.spi.NodeManager; import io.trino.spi.TrinoException; import io.trino.spi.catalog.CatalogName; import io.trino.spi.classloader.ThreadContextClassLoader; @@ -52,6 +57,7 @@ import java.lang.invoke.MethodHandle; import java.time.Instant; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; import java.util.Objects; import java.util.Optional; @@ -62,12 +68,15 @@ import static com.google.common.base.Predicates.alwaysFalse; import static com.google.common.collect.ImmutableList.toImmutableList; import static com.google.common.collect.ImmutableSet.toImmutableSet; +import static io.trino.filesystem.Locations.areDirectoryLocationsEquivalent; import static io.trino.plugin.base.util.Procedures.checkProcedureArgument; import static io.trino.plugin.deltalake.DeltaLakeErrorCode.DELTA_LAKE_FILESYSTEM_ERROR; import static io.trino.plugin.deltalake.DeltaLakeMetadata.MAX_WRITER_VERSION; import static io.trino.plugin.deltalake.DeltaLakeMetadata.checkUnsupportedUniversalFormat; import static io.trino.plugin.deltalake.DeltaLakeMetadata.checkValidTableHandle; import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.getVacuumMinRetention; +import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.isVacuumLoggingEnabled; +import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.IsolationLevel; import static io.trino.plugin.deltalake.transactionlog.DeltaLakeTableFeatures.DELETION_VECTORS_FEATURE_NAME; import static io.trino.plugin.deltalake.transactionlog.DeltaLakeTableFeatures.unsupportedWriterFeatures; import static io.trino.plugin.deltalake.transactionlog.TransactionLogUtil.TRANSACTION_LOG_DIRECTORY; @@ -100,18 +109,26 @@ public class VacuumProcedure private final TrinoFileSystemFactory fileSystemFactory; private final DeltaLakeMetadataFactory metadataFactory; private final TransactionLogAccess transactionLogAccess; + private final TransactionLogWriterFactory transactionLogWriterFactory; + private final String nodeVersion; + private final String nodeId; @Inject public VacuumProcedure( CatalogName catalogName, TrinoFileSystemFactory fileSystemFactory, DeltaLakeMetadataFactory metadataFactory, - TransactionLogAccess transactionLogAccess) + TransactionLogAccess transactionLogAccess, + TransactionLogWriterFactory transactionLogWriterFactory, + NodeManager nodeManager) { this.catalogName = requireNonNull(catalogName, "catalogName is null"); this.fileSystemFactory = requireNonNull(fileSystemFactory, "fileSystemFactory is null"); this.metadataFactory = requireNonNull(metadataFactory, "metadataFactory is null"); this.transactionLogAccess = requireNonNull(transactionLogAccess, "transactionLogAccess is null"); + this.transactionLogWriterFactory = requireNonNull(transactionLogWriterFactory, "transactionLogWriterFactory is null"); + this.nodeVersion = nodeManager.getCurrentNode().getVersion(); + this.nodeId = nodeManager.getCurrentNode().getNodeIdentifier(); } @Override @@ -166,6 +183,7 @@ private void doVacuum( Duration retentionDuration = Duration.valueOf(retention); Duration minRetention = getVacuumMinRetention(session); + boolean isVacuumLoggingEnabled = isVacuumLoggingEnabled(session); checkProcedureArgument( retentionDuration.compareTo(minRetention) >= 0, "Retention specified (%s) is shorter than the minimum retention configured in the system (%s). " + @@ -206,10 +224,10 @@ private void doVacuum( throw new TrinoException(NOT_SUPPORTED, "Cannot execute vacuum procedure with %s writer features".formatted(DELETION_VECTORS_FEATURE_NAME)); } - String tableLocation = tableSnapshot.getTableLocation(); - String transactionLogDir = getTransactionLogDir(tableLocation); + String tableLocationString = tableSnapshot.getTableLocation(); + String transactionLogDir = getTransactionLogDir(tableLocationString); TrinoFileSystem fileSystem = fileSystemFactory.create(session); - String commonPathPrefix = tableLocation.endsWith("/") ? tableLocation : tableLocation + "/"; + String commonPathPrefix = tableLocationString.endsWith("/") ? tableLocationString : tableLocationString + "/"; String queryId = session.getQueryId(); // Retain all active files and every file removed by a "recent" transaction (except for the oldest "recent"). @@ -239,7 +257,7 @@ private void doVacuum( .map(DeltaLakeTransactionLogEntry::getRemove) .filter(Objects::nonNull) .map(RemoveFileEntry::path)) - .peek(path -> checkState(!path.startsWith(tableLocation), "Unexpected absolute path in transaction log: %s", path)) + .peek(path -> checkState(!path.startsWith(tableLocationString), "Unexpected absolute path in transaction log: %s", path)) .collect(toImmutableSet()); } @@ -247,34 +265,44 @@ private void doVacuum( "[%s] attempting to vacuum table %s [%s] with %s retention (expiry threshold %s). %s data file paths marked for retention", queryId, tableName, - tableLocation, + tableLocationString, retention, threshold, retainedPaths.size()); + Location tableLocation = Location.of(tableLocationString); long allPathsChecked = 0; long transactionLogFiles = 0; long retainedKnownFiles = 0; long retainedUnknownFiles = 0; List filesToDelete = new ArrayList<>(); + Set vacuumedDirectories = new HashSet<>(); + vacuumedDirectories.add(tableLocation.path()); + long filesToDeleteSize = 0; - FileIterator listing = fileSystem.listFiles(Location.of(tableLocation)); + FileIterator listing = fileSystem.listFiles(tableLocation); while (listing.hasNext()) { FileEntry entry = listing.next(); + String location = entry.location().toString(); checkState( location.startsWith(commonPathPrefix), "Unexpected path [%s] returned when listing files under [%s]", location, - tableLocation); + tableLocationString); String relativePath = location.substring(commonPathPrefix.length()); if (relativePath.isEmpty()) { - // A file returned for "tableLocation/", might be possible on S3. + // A file returned for "tableLocationString/", might be possible on S3. continue; } allPathsChecked++; + Location parentDirectory = entry.location().parentDirectory(); + while (!areDirectoryLocationsEquivalent(parentDirectory, tableLocation)) { + vacuumedDirectories.add(parentDirectory.path()); + parentDirectory = parentDirectory.parentDirectory(); + } - // ignore tableLocation/_delta_log/** + // ignore tableLocationString/_delta_log/** if (relativePath.equals(TRANSACTION_LOG_DIRECTORY) || relativePath.startsWith(TRANSACTION_LOG_DIRECTORY + "/")) { log.debug("[%s] skipping a file inside transaction log dir: %s", queryId, location); transactionLogFiles++; @@ -300,22 +328,38 @@ private void doVacuum( Location fileLocation = Location.of(location); TrinoInputFile inputFile = fileSystem.newInputFile(fileLocation); filesToDelete.add(inputFile); + filesToDeleteSize += inputFile.length(); + } + long readVersion = handle.getReadVersion(); + if (isVacuumLoggingEnabled) { + logVacuumStart(handle.location(), session, readVersion, filesToDelete.size(), filesToDeleteSize); } int totalFilesToDelete = filesToDelete.size(); int batchCount = (int) Math.ceil((double) totalFilesToDelete / DELETE_BATCH_SIZE); - for (int batchNumber = 0; batchNumber < batchCount; batchNumber++) { - int start = batchNumber * DELETE_BATCH_SIZE; - int end = Math.min(start + DELETE_BATCH_SIZE, totalFilesToDelete); + try { + for (int batchNumber = 0; batchNumber < batchCount; batchNumber++) { + int start = batchNumber * DELETE_BATCH_SIZE; + int end = Math.min(start + DELETE_BATCH_SIZE, totalFilesToDelete); - List batch = filesToDelete.subList(start, end); - fileSystem.deleteFiles(batch.stream().map(TrinoInputFile::location).collect(toImmutableList())); + List batch = filesToDelete.subList(start, end); + fileSystem.deleteFiles(batch.stream().map(TrinoInputFile::location).collect(toImmutableList())); + } + } + catch (IOException e) { + if (isVacuumLoggingEnabled) { + // This mimics Delta behaviour where it sets metrics to 0 in case of a failure + logVacuumEnd(handle.location(), session, readVersion, 0, 0, "FAILED"); + } + throw e; + } + if (isVacuumLoggingEnabled) { + logVacuumEnd(handle.location(), session, readVersion, filesToDelete.size(), vacuumedDirectories.size(), "COMPLETED"); } - log.info( "[%s] finished vacuuming table %s [%s]: files checked: %s; metadata files: %s; retained known files: %s; retained unknown files: %s; removed files: %s", queryId, tableName, - tableLocation, + tableLocationString, allPathsChecked, transactionLogFiles, retainedKnownFiles, @@ -323,4 +367,45 @@ private void doVacuum( totalFilesToDelete); } } + + private void logVacuumStart(String location, ConnectorSession session, long readVersion, long numFilesToDelete, long filesToDeleteSize) + throws IOException + { + long createdTime = System.currentTimeMillis(); + long commitVersion = readVersion + 1; + + TransactionLogWriter transactionLogWriter = transactionLogWriterFactory.newWriterWithoutTransactionIsolation(session, location); + transactionLogWriter.appendCommitInfoEntry(getCommitInfoEntry(commitVersion, createdTime, session, "VACUUM START", ImmutableMap.of("queryId", session.getQueryId()), ImmutableMap.of("numFilesToDelete", String.valueOf(numFilesToDelete), "sizeOfDataToDelete", String.valueOf(filesToDeleteSize)), readVersion)); + + transactionLogWriter.flush(); + } + + private void logVacuumEnd(String location, ConnectorSession session, long readVersion, int numDeletedFiles, int numVacuumedDirectories, String status) + throws IOException + { + long createdTime = System.currentTimeMillis(); + long commitVersion = readVersion + 2; + + TransactionLogWriter transactionLogWriter = transactionLogWriterFactory.newWriterWithoutTransactionIsolation(session, location); + transactionLogWriter.appendCommitInfoEntry(getCommitInfoEntry(commitVersion, createdTime, session, "VACUUM END", ImmutableMap.of("queryId", session.getQueryId(), "status", status), ImmutableMap.of("numDeletedFiles", String.valueOf(numDeletedFiles), "numVacuumedDirectories", String.valueOf(numVacuumedDirectories)), readVersion)); + transactionLogWriter.flush(); + } + + private CommitInfoEntry getCommitInfoEntry(long commitVersion, long createdTime, ConnectorSession session, String operation, ImmutableMap operationParameters, ImmutableMap operationMetrics, long readVersion) + { + return new CommitInfoEntry( + commitVersion, + createdTime, + session.getUser(), + session.getUser(), + operation, + operationParameters, + null, + null, + "trino-" + nodeVersion + "-" + nodeId, + readVersion, + IsolationLevel.WRITESERIALIZABLE.getValue(), + Optional.of(true), + operationMetrics); + } } diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeConnectorSmokeTest.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeConnectorSmokeTest.java index b143ce622d3a..8d80814dbfb7 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeConnectorSmokeTest.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeConnectorSmokeTest.java @@ -230,6 +230,7 @@ private QueryRunner createDeltaLakeQueryRunner() .put("hive.metastore-cache-ttl", TEST_METADATA_CACHE_TTL_SECONDS + "s") .put("delta.register-table-procedure.enabled", "true") .put("hive.metastore.thrift.client.read-timeout", "1m") // read timed out sometimes happens with the default timeout + .put("delta.vacuum.transaction-logging.enabled", "true") .putAll(deltaStorageConfiguration()) .buildOrThrow()) .setSchemaLocation(getLocationForTable(bucketName, SCHEMA)) @@ -1716,6 +1717,9 @@ public void testVacuum() Session sessionWithShortRetentionUnlocked = Session.builder(getSession()) .setCatalogSessionProperty(catalog, "vacuum_min_retention", "0s") .build(); + Session sessionWithVacuumLoggingDisabled = Session.builder(sessionWithShortRetentionUnlocked) + .setCatalogSessionProperty(catalog, "vacuum_transaction_logging_enabled", "false") + .build(); assertUpdate( format("CREATE TABLE %s WITH (location = '%s', partitioned_by = ARRAY['regionkey']) AS SELECT * FROM tpch.tiny.nation", tableName, tableLocation), 25); @@ -1746,6 +1750,34 @@ public void testVacuum() assertThat(getActiveFiles(tableName)).isEqualTo(updatedFiles); // old files should be cleaned up assertThat(getAllDataFilesFromTableDirectory(tableName)).isEqualTo(updatedFiles); + // operations should be logged + assertThat(query("SELECT version, operation, MAP_FILTER(operation_parameters, (k, v) -> k <> 'queryId') FROM \"" + tableName + "$history\"")).matches( """ + VALUES\s + (CAST(0 AS BIGINT), CAST('CREATE TABLE AS SELECT' AS VARCHAR), CAST(MAP() AS MAP(VARCHAR, VARCHAR))),\s + (1, 'MERGE', MAP()),\s + (2, 'VACUUM START', MAP()),\s + (3, 'VACUUM END', MAP(ARRAY['status'], ARRAY['COMPLETED'])),\s + (4, 'VACUUM START', MAP()),\s + (5, 'VACUUM END', MAP(ARRAY['status'], ARRAY['COMPLETED']))"""); + assertThat(Integer.parseInt((String) computeScalar("SELECT operation_metrics['sizeOfDataToDelete'] FROM \"" + tableName + "$history\" WHERE version = 2"))).isEqualTo(0); + assertThat(Integer.parseInt((String) computeScalar("SELECT operation_metrics['numFilesToDelete'] FROM \"" + tableName + "$history\" WHERE version = 2"))).isEqualTo(0); + assertThat(Integer.parseInt((String) computeScalar("SELECT operation_metrics['numDeletedFiles'] FROM \"" + tableName + "$history\" WHERE version = 3"))).isEqualTo(0); + assertThat(Integer.parseInt((String) computeScalar("SELECT operation_metrics['numVacuumedDirectories'] FROM \"" + tableName + "$history\" WHERE version = 3"))).isEqualTo(8); + assertThat(Integer.parseInt((String) computeScalar("SELECT operation_metrics['sizeOfDataToDelete'] FROM \"" + tableName + "$history\" WHERE version = 4"))).isGreaterThanOrEqualTo(5012); + assertThat(Integer.parseInt((String) computeScalar("SELECT operation_metrics['numFilesToDelete'] FROM \"" + tableName + "$history\" WHERE version = 4"))).isEqualTo(5); + assertThat(Integer.parseInt((String) computeScalar("SELECT operation_metrics['numDeletedFiles'] FROM \"" + tableName + "$history\" WHERE version = 5"))).isEqualTo(5); + assertThat(Integer.parseInt((String) computeScalar("SELECT operation_metrics['numVacuumedDirectories'] FROM \"" + tableName + "$history\" WHERE version = 5"))).isEqualTo(8); + + assertUpdate(sessionWithVacuumLoggingDisabled, "CALL system.vacuum(schema_name => CURRENT_SCHEMA, table_name => '" + tableName + "', retention => '1s')"); + // no new vacuum logging operations are logged + assertQuery("SELECT version, operation FROM \"" + tableName + "$history\"", """ + VALUES\s + (0, 'CREATE TABLE AS SELECT'),\s + (1, 'MERGE'),\s + (2, 'VACUUM START'),\s + (3, 'VACUUM END'),\s + (4, 'VACUUM START'),\s + (5, 'VACUUM END')"""); } finally { assertUpdate("DROP TABLE " + tableName); diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeConfig.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeConfig.java index 1158f3ceeb83..e08e9b3bc863 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeConfig.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeConfig.java @@ -75,7 +75,8 @@ public void testDefaults() .setQueryPartitionFilterRequired(false) .setDeletionVectorsEnabled(false) .setDeltaLogFileSystemCacheDisabled(false) - .setMetadataParallelism(8)); + .setMetadataParallelism(8) + .setVacuumTransactionLoggingEnabled(false)); } @Test @@ -118,6 +119,7 @@ public void testExplicitPropertyMappings() .put("delta.deletion-vectors-enabled", "true") .put("delta.fs.cache.disable-transaction-log-caching", "true") .put("delta.metadata.parallelism", "10") + .put("delta.vacuum.transaction-logging.enabled", "true") .buildOrThrow(); DeltaLakeConfig expected = new DeltaLakeConfig() @@ -156,7 +158,8 @@ public void testExplicitPropertyMappings() .setQueryPartitionFilterRequired(true) .setDeletionVectorsEnabled(true) .setDeltaLogFileSystemCacheDisabled(true) - .setMetadataParallelism(10); + .setMetadataParallelism(10) + .setVacuumTransactionLoggingEnabled(true); assertFullMapping(properties, expected); }