diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java index 33fa3ebe2da5..10aa50efe3e1 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java @@ -659,10 +659,6 @@ private void processDataBlock(HoodieDataBlock dataBlock, Option keySpec */ protected abstract void processNextDeletedRecord(DeleteRecord deleteRecord); - public Set getDeletedRecordKeys() { - throw new HoodieException("Inherited class needs to override to provide a concrete implementation"); - } - /** * Process the set of log blocks belonging to the last instant which is read fully. */ diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java index 09b767f04a88..e807be2bcee1 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java @@ -19,6 +19,7 @@ package org.apache.hudi.common.table.log; import org.apache.hudi.common.model.DeleteRecord; +import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodiePreCombineAvroRecordMerger; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordMerger; @@ -33,9 +34,7 @@ import org.apache.avro.Schema; -import java.util.HashSet; import java.util.List; -import java.util.Set; import java.util.stream.Collectors; /** @@ -44,17 +43,19 @@ public class HoodieUnMergedLogRecordScanner extends AbstractHoodieLogRecordScanner { private final LogRecordScannerCallback callback; - private final Set deletedRecordKeys = new HashSet<>(); + private final CallbackForDeletedKeys callbackForDeletedKeys; private HoodieUnMergedLogRecordScanner(HoodieStorage storage, String basePath, List logFilePaths, Schema readerSchema, String latestInstantTime, boolean reverseReader, int bufferSize, - LogRecordScannerCallback callback, Option instantRange, InternalSchema internalSchema, + LogRecordScannerCallback callback, CallbackForDeletedKeys callbackForDeletedKeys, + Option instantRange, InternalSchema internalSchema, boolean enableOptimizedLogBlocksScan, HoodieRecordMerger recordMerger, Option hoodieTableMetaClientOption) { super(storage, basePath, logFilePaths, readerSchema, latestInstantTime, reverseReader, bufferSize, instantRange, false, true, Option.empty(), internalSchema, Option.empty(), enableOptimizedLogBlocksScan, recordMerger, hoodieTableMetaClientOption); this.callback = callback; + this.callbackForDeletedKeys = callbackForDeletedKeys; } /** @@ -86,12 +87,9 @@ protected void processNextRecord(HoodieRecord hoodieRecord) throws Except @Override protected void processNextDeletedRecord(DeleteRecord deleteRecord) { - deletedRecordKeys.add(deleteRecord.getRecordKey()); - } - - @Override - public Set getDeletedRecordKeys() { - return deletedRecordKeys; + if (callbackForDeletedKeys != null) { + callbackForDeletedKeys.apply(deleteRecord.getHoodieKey()); + } } /** @@ -103,6 +101,14 @@ public interface LogRecordScannerCallback { void apply(HoodieRecord record) throws Exception; } + /** + * A callback for log record scanner to consume deleted HoodieKeys. + */ + @FunctionalInterface + public interface CallbackForDeletedKeys { + void apply(HoodieKey deletedKey); + } + /** * Builder used to build {@code HoodieUnMergedLogRecordScanner}. */ @@ -118,6 +124,7 @@ public static class Builder extends AbstractHoodieLogRecordScanner.Builder { private Option instantRange = Option.empty(); // specific configurations private LogRecordScannerCallback callback; + private CallbackForDeletedKeys callbackForDeletedKeys; private boolean enableOptimizedLogBlocksScan; private HoodieRecordMerger recordMerger = HoodiePreCombineAvroRecordMerger.INSTANCE; private HoodieTableMetaClient hoodieTableMetaClient; @@ -181,6 +188,11 @@ public Builder withLogRecordScannerCallback(LogRecordScannerCallback callback) { return this; } + public Builder withLogRecordScannerCallbackForDeletedKeys(CallbackForDeletedKeys callbackForDeletedKeys) { + this.callbackForDeletedKeys = callbackForDeletedKeys; + return this; + } + @Override public Builder withOptimizedLogBlocksScan(boolean enableOptimizedLogBlocksScan) { this.enableOptimizedLogBlocksScan = enableOptimizedLogBlocksScan; @@ -205,7 +217,7 @@ public HoodieUnMergedLogRecordScanner build() { ValidationUtils.checkArgument(recordMerger != null); return new HoodieUnMergedLogRecordScanner(storage, basePath, logFilePaths, readerSchema, - latestInstantTime, reverseReader, bufferSize, callback, instantRange, + latestInstantTime, reverseReader, bufferSize, callback, callbackForDeletedKeys, instantRange, internalSchema, enableOptimizedLogBlocksScan, recordMerger, Option.ofNullable(hoodieTableMetaClient)); } } diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java index c1f561a705a9..5626e88b0fe6 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java @@ -787,13 +787,13 @@ static HoodieData convertMetadataToRecordIndexRecords(HoodieEngine int parallelism = Math.max(Math.min(allWriteStats.size(), metadataConfig.getRecordIndexMaxParallelism()), 1); String basePath = dataTableMetaClient.getBasePath().toString(); // we might need to set some additional variables if we need to process log files. - boolean anyLogFilesWithDeleteBlocks = allWriteStats.stream().anyMatch(writeStat -> { + boolean anyLogFilesWithDeletes = allWriteStats.stream().anyMatch(writeStat -> { String fileName = FSUtils.getFileName(writeStat.getPath(), writeStat.getPartitionPath()); - return FSUtils.isLogFile(fileName) && writeStat.getNumInserts() == 0 && writeStat.getNumUpdateWrites() == 0 && writeStat.getNumDeletes() > 0; + return FSUtils.isLogFile(fileName) && writeStat.getNumDeletes() > 0; }); Option latestCommitTimestamp = Option.empty(); Option writerSchemaOpt = Option.empty(); - if (anyLogFilesWithDeleteBlocks) { // if we have a log file w/ pure deletes. + if (anyLogFilesWithDeletes) { // if we have a log file w/ deletes. latestCommitTimestamp = Option.of(dataTableMetaClient.getActiveTimeline().getCommitsTimeline().lastInstant().get().requestedTime()); writerSchemaOpt = tryResolveSchemaForTable(dataTableMetaClient); } @@ -808,9 +808,10 @@ static HoodieData convertMetadataToRecordIndexRecords(HoodieEngine if (writeStat.getPath().endsWith(HoodieFileFormat.PARQUET.getFileExtension())) { return BaseFileUtils.generateRLIMetadataHoodieRecordsForBaseFile(basePath, writeStat, writesFileIdEncoding, instantTime, storage); } else { - // for logs, we only need to process delete blocks for RLI - if (writeStat.getNumInserts() == 0 && writeStat.getNumUpdateWrites() == 0 && writeStat.getNumDeletes() > 0) { - Set deletedRecordKeys = getDeletedRecordKeys(dataTableMetaClient.getBasePath().toString() + "/" + writeStat.getPath(), dataTableMetaClient, + // for logs, we only need to process log files containing deletes + if (writeStat.getNumDeletes() > 0) { + StoragePath fullFilePath = new StoragePath(dataTableMetaClient.getBasePath(), writeStat.getPath()); + Set deletedRecordKeys = getDeletedRecordKeys(fullFilePath.toString(), dataTableMetaClient, finalWriterSchemaOpt, maxBufferSize, finalLatestCommitTimestamp.get()); return deletedRecordKeys.stream().map(recordKey -> HoodieMetadataPayload.createRecordIndexDelete(recordKey)).collect(toList()).iterator(); } @@ -1397,7 +1398,7 @@ private static Set getDeletedRecordKeys(String filePath, HoodieTableMeta String latestCommitTimestamp) throws IOException { if (writerSchemaOpt.isPresent()) { // read log file records without merging - List records = new ArrayList<>(); + List deletedKeys = new ArrayList<>(); HoodieUnMergedLogRecordScanner scanner = HoodieUnMergedLogRecordScanner.newBuilder() .withStorage(datasetMetaClient.getStorage()) .withBasePath(datasetMetaClient.getBasePath()) @@ -1406,10 +1407,10 @@ private static Set getDeletedRecordKeys(String filePath, HoodieTableMeta .withLatestInstantTime(latestCommitTimestamp) .withReaderSchema(writerSchemaOpt.get()) .withTableMetaClient(datasetMetaClient) + .withLogRecordScannerCallbackForDeletedKeys(deletedKey -> deletedKeys.add(deletedKey.getRecordKey())) .build(); scanner.scan(); - // HoodieUnMergedLogRecordScanner will expose deleted record keys - return scanner.getDeletedRecordKeys(); + return deletedKeys.stream().collect(Collectors.toSet()); } return Collections.emptySet(); }