Skip to content

Commit

Permalink
addressing comments
Browse files Browse the repository at this point in the history
  • Loading branch information
nsivabalan committed Nov 20, 2024
1 parent 6e281a3 commit a8f2488
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -659,10 +659,6 @@ private void processDataBlock(HoodieDataBlock dataBlock, Option<KeySpec> keySpec
*/
protected abstract void processNextDeletedRecord(DeleteRecord deleteRecord);

public Set<String> getDeletedRecordKeys() {
throw new HoodieException("Inherited class needs to override to provide a concrete implementation");
}

/**
* Process the set of log blocks belonging to the last instant which is read fully.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.hudi.common.table.log;

import org.apache.hudi.common.model.DeleteRecord;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodiePreCombineAvroRecordMerger;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordMerger;
Expand All @@ -33,9 +34,7 @@

import org.apache.avro.Schema;

import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

/**
Expand All @@ -44,17 +43,19 @@
public class HoodieUnMergedLogRecordScanner extends AbstractHoodieLogRecordScanner {

private final LogRecordScannerCallback callback;
private final Set<String> deletedRecordKeys = new HashSet<>();
private final CallbackForDeletedKeys callbackForDeletedKeys;

private HoodieUnMergedLogRecordScanner(HoodieStorage storage, String basePath, List<String> logFilePaths, Schema readerSchema,
String latestInstantTime, boolean reverseReader, int bufferSize,
LogRecordScannerCallback callback, Option<InstantRange> instantRange, InternalSchema internalSchema,
LogRecordScannerCallback callback, CallbackForDeletedKeys callbackForDeletedKeys,
Option<InstantRange> instantRange, InternalSchema internalSchema,
boolean enableOptimizedLogBlocksScan, HoodieRecordMerger recordMerger,
Option<HoodieTableMetaClient> hoodieTableMetaClientOption) {
super(storage, basePath, logFilePaths, readerSchema, latestInstantTime, reverseReader, bufferSize, instantRange,
false, true, Option.empty(), internalSchema, Option.empty(), enableOptimizedLogBlocksScan, recordMerger,
hoodieTableMetaClientOption);
this.callback = callback;
this.callbackForDeletedKeys = callbackForDeletedKeys;
}

/**
Expand Down Expand Up @@ -86,12 +87,9 @@ protected <T> void processNextRecord(HoodieRecord<T> hoodieRecord) throws Except

@Override
protected void processNextDeletedRecord(DeleteRecord deleteRecord) {
deletedRecordKeys.add(deleteRecord.getRecordKey());
}

@Override
public Set<String> getDeletedRecordKeys() {
return deletedRecordKeys;
if (callbackForDeletedKeys != null) {
callbackForDeletedKeys.apply(deleteRecord.getHoodieKey());
}
}

/**
Expand All @@ -103,6 +101,14 @@ public interface LogRecordScannerCallback {
void apply(HoodieRecord<?> record) throws Exception;
}

/**
* A callback for log record scanner to consume deleted HoodieKeys.
*/
@FunctionalInterface
public interface CallbackForDeletedKeys {
void apply(HoodieKey deletedKey);
}

/**
* Builder used to build {@code HoodieUnMergedLogRecordScanner}.
*/
Expand All @@ -118,6 +124,7 @@ public static class Builder extends AbstractHoodieLogRecordScanner.Builder {
private Option<InstantRange> instantRange = Option.empty();
// specific configurations
private LogRecordScannerCallback callback;
private CallbackForDeletedKeys callbackForDeletedKeys;
private boolean enableOptimizedLogBlocksScan;
private HoodieRecordMerger recordMerger = HoodiePreCombineAvroRecordMerger.INSTANCE;
private HoodieTableMetaClient hoodieTableMetaClient;
Expand Down Expand Up @@ -181,6 +188,11 @@ public Builder withLogRecordScannerCallback(LogRecordScannerCallback callback) {
return this;
}

public Builder withLogRecordScannerCallbackForDeletedKeys(CallbackForDeletedKeys callbackForDeletedKeys) {
this.callbackForDeletedKeys = callbackForDeletedKeys;
return this;
}

@Override
public Builder withOptimizedLogBlocksScan(boolean enableOptimizedLogBlocksScan) {
this.enableOptimizedLogBlocksScan = enableOptimizedLogBlocksScan;
Expand All @@ -205,7 +217,7 @@ public HoodieUnMergedLogRecordScanner build() {
ValidationUtils.checkArgument(recordMerger != null);

return new HoodieUnMergedLogRecordScanner(storage, basePath, logFilePaths, readerSchema,
latestInstantTime, reverseReader, bufferSize, callback, instantRange,
latestInstantTime, reverseReader, bufferSize, callback, callbackForDeletedKeys, instantRange,
internalSchema, enableOptimizedLogBlocksScan, recordMerger, Option.ofNullable(hoodieTableMetaClient));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -787,13 +787,13 @@ static HoodieData<HoodieRecord> convertMetadataToRecordIndexRecords(HoodieEngine
int parallelism = Math.max(Math.min(allWriteStats.size(), metadataConfig.getRecordIndexMaxParallelism()), 1);
String basePath = dataTableMetaClient.getBasePath().toString();
// we might need to set some additional variables if we need to process log files.
boolean anyLogFilesWithDeleteBlocks = allWriteStats.stream().anyMatch(writeStat -> {
boolean anyLogFilesWithDeletes = allWriteStats.stream().anyMatch(writeStat -> {
String fileName = FSUtils.getFileName(writeStat.getPath(), writeStat.getPartitionPath());
return FSUtils.isLogFile(fileName) && writeStat.getNumInserts() == 0 && writeStat.getNumUpdateWrites() == 0 && writeStat.getNumDeletes() > 0;
return FSUtils.isLogFile(fileName) && writeStat.getNumDeletes() > 0;
});
Option<String> latestCommitTimestamp = Option.empty();
Option<Schema> writerSchemaOpt = Option.empty();
if (anyLogFilesWithDeleteBlocks) { // if we have a log file w/ pure deletes.
if (anyLogFilesWithDeletes) { // if we have a log file w/ deletes.
latestCommitTimestamp = Option.of(dataTableMetaClient.getActiveTimeline().getCommitsTimeline().lastInstant().get().requestedTime());
writerSchemaOpt = tryResolveSchemaForTable(dataTableMetaClient);
}
Expand All @@ -808,9 +808,10 @@ static HoodieData<HoodieRecord> convertMetadataToRecordIndexRecords(HoodieEngine
if (writeStat.getPath().endsWith(HoodieFileFormat.PARQUET.getFileExtension())) {
return BaseFileUtils.generateRLIMetadataHoodieRecordsForBaseFile(basePath, writeStat, writesFileIdEncoding, instantTime, storage);
} else {
// for logs, we only need to process delete blocks for RLI
if (writeStat.getNumInserts() == 0 && writeStat.getNumUpdateWrites() == 0 && writeStat.getNumDeletes() > 0) {
Set<String> deletedRecordKeys = getDeletedRecordKeys(dataTableMetaClient.getBasePath().toString() + "/" + writeStat.getPath(), dataTableMetaClient,
// for logs, we only need to process log files containing deletes
if (writeStat.getNumDeletes() > 0) {
StoragePath fullFilePath = new StoragePath(dataTableMetaClient.getBasePath(), writeStat.getPath());
Set<String> deletedRecordKeys = getDeletedRecordKeys(fullFilePath.toString(), dataTableMetaClient,
finalWriterSchemaOpt, maxBufferSize, finalLatestCommitTimestamp.get());
return deletedRecordKeys.stream().map(recordKey -> HoodieMetadataPayload.createRecordIndexDelete(recordKey)).collect(toList()).iterator();
}
Expand Down Expand Up @@ -1397,7 +1398,7 @@ private static Set<String> getDeletedRecordKeys(String filePath, HoodieTableMeta
String latestCommitTimestamp) throws IOException {
if (writerSchemaOpt.isPresent()) {
// read log file records without merging
List<HoodieRecord> records = new ArrayList<>();
List<String> deletedKeys = new ArrayList<>();
HoodieUnMergedLogRecordScanner scanner = HoodieUnMergedLogRecordScanner.newBuilder()
.withStorage(datasetMetaClient.getStorage())
.withBasePath(datasetMetaClient.getBasePath())
Expand All @@ -1406,10 +1407,10 @@ private static Set<String> getDeletedRecordKeys(String filePath, HoodieTableMeta
.withLatestInstantTime(latestCommitTimestamp)
.withReaderSchema(writerSchemaOpt.get())
.withTableMetaClient(datasetMetaClient)
.withLogRecordScannerCallbackForDeletedKeys(deletedKey -> deletedKeys.add(deletedKey.getRecordKey()))
.build();
scanner.scan();
// HoodieUnMergedLogRecordScanner will expose deleted record keys
return scanner.getDeletedRecordKeys();
return deletedKeys.stream().collect(Collectors.toSet());
}
return Collections.emptySet();
}
Expand Down

0 comments on commit a8f2488

Please sign in to comment.