diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestDisruptorMessageQueue.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestDisruptorMessageQueue.java index 7d324e529602..76c22f96e726 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestDisruptorMessageQueue.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestDisruptorMessageQueue.java @@ -39,7 +39,6 @@ import org.apache.spark.TaskContext$; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; import scala.Tuple2; @@ -86,11 +85,10 @@ private Runnable getPreExecuteRunnable() { // Test to ensure that we are reading all records from queue iterator in the same order // without any exceptions. - @Disabled("Disabled for unblocking 0.12.2 release. Disruptor queue is not part of this minor release. Tracked in HUDI-5410") @SuppressWarnings("unchecked") @Test @Timeout(value = 60) - public void testRecordReading() { + public void testRecordReading() throws Exception { final List hoodieRecords = dataGen.generateInserts(instantTime, 100); ArrayList beforeRecord = new ArrayList<>(); diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieCopyOnWriteTableInputFormat.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieCopyOnWriteTableInputFormat.java index ce441bf2e289..140e7ff5b633 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieCopyOnWriteTableInputFormat.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieCopyOnWriteTableInputFormat.java @@ -18,9 +18,21 @@ package org.apache.hudi.hadoop; +import org.apache.avro.Schema; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.ArrayWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapreduce.Job; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.engine.HoodieLocalEngineContext; -import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.FileSlice; import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodieLogFile; @@ -30,8 +42,7 @@ import org.apache.hudi.common.table.TableSchemaResolver; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; -import org.apache.hudi.common.table.view.FileSystemViewManager; -import org.apache.hudi.common.table.view.HoodieTableFileSystemView; +import org.apache.hudi.common.util.CollectionUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.exception.HoodieException; @@ -39,42 +50,21 @@ import org.apache.hudi.hadoop.realtime.HoodieVirtualKeyInfo; import org.apache.hudi.hadoop.utils.HoodieHiveUtils; import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils; -import org.apache.hudi.metadata.HoodieTableMetadataUtil; - -import org.apache.avro.Schema; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.ArrayWritable; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.mapred.FileInputFormat; -import org.apache.hadoop.mapred.FileSplit; -import org.apache.hadoop.mapred.InputSplit; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.RecordReader; -import org.apache.hadoop.mapred.Reporter; -import org.apache.hadoop.mapreduce.Job; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import javax.annotation.Nonnull; - import java.io.IOException; import java.io.UnsupportedEncodingException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.stream.Collectors; -import static org.apache.hudi.common.config.HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS; -import static org.apache.hudi.common.config.HoodieMetadataConfig.ENABLE; -import static org.apache.hudi.common.model.HoodieTableType.COPY_ON_WRITE; -import static org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.buildMetadataConfig; -import static org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.getFileStatus; +import static org.apache.hudi.common.util.ValidationUtils.checkState; /** * Base implementation of the Hive's {@link FileInputFormat} allowing for reading of Hudi's @@ -200,7 +190,7 @@ protected List listStatusForIncrementalMode(JobConf job, return HoodieInputFormatUtils.filterIncrementalFileStatus(jobContext, tableMetaClient, timeline.get(), fileStatuses, commitsToCheck.get()); } - protected FileStatus createFileStatusUnchecked(FileSlice fileSlice, Option instantOpt, String basePath, Option virtualKeyInfoOpt) { + protected FileStatus createFileStatusUnchecked(FileSlice fileSlice, HiveHoodieTableFileIndex fileIndex, Option virtualKeyInfoOpt) { Option baseFileOpt = fileSlice.getBaseFile(); if (baseFileOpt.isPresent()) { @@ -233,7 +223,6 @@ private List listStatusForSnapshotMode(JobConf job, Map> groupedPaths = HoodieInputFormatUtils.groupSnapshotPathsByMetaClient(tableMetaClientMap.values(), snapshotPaths); - Map fsViewCache = new HashMap<>(); for (Map.Entry> entry : groupedPaths.entrySet()) { HoodieTableMetaClient tableMetaClient = entry.getKey(); @@ -247,83 +236,33 @@ private List listStatusForSnapshotMode(JobConf job, boolean shouldIncludePendingCommits = HoodieHiveUtils.shouldIncludePendingCommits(job, tableMetaClient.getTableConfig().getTableName()); - // NOTE: Fetching virtual key info is a costly operation as it needs to load the commit metadata. - // This is only needed for MOR realtime splits. Hence, for COW tables, this can be avoided. - Option virtualKeyInfoOpt = tableMetaClient.getTableType().equals(COPY_ON_WRITE) ? Option.empty() : getHoodieVirtualKeyInfo(tableMetaClient); - String basePath = tableMetaClient.getBasePathV2().toString(); - - if (conf.getBoolean(ENABLE.key(), DEFAULT_METADATA_ENABLE_FOR_READERS) && HoodieTableMetadataUtil.isFilesPartitionAvailable(tableMetaClient)) { - HiveHoodieTableFileIndex fileIndex = - new HiveHoodieTableFileIndex( - engineContext, - tableMetaClient, - props, - HoodieTableQueryType.SNAPSHOT, - partitionPaths, - queryCommitInstant, - shouldIncludePendingCommits); - - Map> partitionedFileSlices = fileIndex.listFileSlices(); - - targetFiles.addAll( - partitionedFileSlices.values() - .stream() - .flatMap(Collection::stream) - .filter(fileSlice -> checkIfValidFileSlice(fileSlice)) - .map(fileSlice -> createFileStatusUnchecked(fileSlice, fileIndex.getLatestCompletedInstant(), basePath, virtualKeyInfoOpt)) - .collect(Collectors.toList()) - ); - } else { - HoodieTimeline timeline = getActiveTimeline(tableMetaClient, shouldIncludePendingCommits); - Option queryInstant = queryCommitInstant.or(() -> timeline.lastInstant().map(HoodieInstant::getTimestamp)); - validateInstant(timeline, queryInstant); - - try { - HoodieTableFileSystemView fsView = fsViewCache.computeIfAbsent(tableMetaClient, hoodieTableMetaClient -> - FileSystemViewManager.createInMemoryFileSystemViewWithTimeline(engineContext, hoodieTableMetaClient, buildMetadataConfig(job), timeline)); - - List filteredFileSlices = new ArrayList<>(); - - for (Path p : entry.getValue()) { - String relativePartitionPath = FSUtils.getRelativePartitionPath(new Path(basePath), p); - - List fileSlices = queryInstant.map( - instant -> fsView.getLatestMergedFileSlicesBeforeOrOn(relativePartitionPath, instant)) - .orElse(fsView.getLatestFileSlices(relativePartitionPath)) - .collect(Collectors.toList()); - - filteredFileSlices.addAll(fileSlices); - } - - targetFiles.addAll( - filteredFileSlices.stream() - .filter(fileSlice -> checkIfValidFileSlice(fileSlice)) - .map(fileSlice -> createFileStatusUnchecked(fileSlice, timeline.filterCompletedInstants().lastInstant(), basePath, virtualKeyInfoOpt)) - .collect(Collectors.toList())); - } finally { - fsViewCache.forEach(((metaClient, fsView) -> fsView.close())); - } - } + HiveHoodieTableFileIndex fileIndex = + new HiveHoodieTableFileIndex( + engineContext, + tableMetaClient, + props, + HoodieTableQueryType.SNAPSHOT, + partitionPaths, + queryCommitInstant, + shouldIncludePendingCommits); + + Map> partitionedFileSlices = fileIndex.listFileSlices(); + + Option virtualKeyInfoOpt = getHoodieVirtualKeyInfo(tableMetaClient); + + targetFiles.addAll( + partitionedFileSlices.values() + .stream() + .flatMap(Collection::stream) + .filter(fileSlice -> checkIfValidFileSlice(fileSlice)) + .map(fileSlice -> createFileStatusUnchecked(fileSlice, fileIndex, virtualKeyInfoOpt)) + .collect(Collectors.toList()) + ); } return targetFiles; } - private static HoodieTimeline getActiveTimeline(HoodieTableMetaClient metaClient, boolean shouldIncludePendingCommits) { - HoodieTimeline timeline = metaClient.getCommitsAndCompactionTimeline(); - if (shouldIncludePendingCommits) { - return timeline; - } else { - return timeline.filterCompletedAndCompactionInstants(); - } - } - - private static void validateInstant(HoodieTimeline activeTimeline, Option queryInstant) { - if (queryInstant.isPresent() && !activeTimeline.containsInstant(queryInstant.get())) { - throw new HoodieIOException(String.format("Query instant (%s) not found in the timeline", queryInstant.get())); - } - } - protected boolean checkIfValidFileSlice(FileSlice fileSlice) { Option baseFileOpt = fileSlice.getBaseFile(); Option latestLogFileOpt = fileSlice.getLatestLogFile(); @@ -338,10 +277,15 @@ protected boolean checkIfValidFileSlice(FileSlice fileSlice) { } } + private void validate(List targetFiles, List legacyFileStatuses) { + List diff = CollectionUtils.diff(targetFiles, legacyFileStatuses); + checkState(diff.isEmpty(), "Should be empty"); + } + @Nonnull protected static FileStatus getFileStatusUnchecked(HoodieBaseFile baseFile) { try { - return getFileStatus(baseFile); + return HoodieInputFormatUtils.getFileStatus(baseFile); } catch (IOException ioe) { throw new HoodieIOException("Failed to get file-status", ioe); } diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieMergeOnReadTableInputFormat.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieMergeOnReadTableInputFormat.java index 6a198f9ad338..95a1a74b65b9 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieMergeOnReadTableInputFormat.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieMergeOnReadTableInputFormat.java @@ -18,6 +18,16 @@ package org.apache.hudi.hadoop.realtime; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.SplitLocationInfo; +import org.apache.hadoop.mapreduce.Job; import org.apache.hudi.common.model.FileSlice; import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodieCommitMetadata; @@ -33,23 +43,13 @@ import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.hadoop.BootstrapBaseFileSplit; import org.apache.hudi.hadoop.FileStatusWithBootstrapBaseFile; +import org.apache.hudi.hadoop.HiveHoodieTableFileIndex; import org.apache.hudi.hadoop.HoodieCopyOnWriteTableInputFormat; import org.apache.hudi.hadoop.LocatedFileStatusWithBootstrapBaseFile; import org.apache.hudi.hadoop.RealtimeFileStatus; import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils; import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils; -import org.apache.hadoop.conf.Configurable; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapred.FileInputFormat; -import org.apache.hadoop.mapred.FileSplit; -import org.apache.hadoop.mapred.InputSplit; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.SplitLocationInfo; -import org.apache.hadoop.mapreduce.Job; - import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; @@ -86,14 +86,14 @@ public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { } @Override - protected FileStatus createFileStatusUnchecked(FileSlice fileSlice, - Option latestCompletedInstantOpt, - String tableBasePath, - Option virtualKeyInfoOpt) { + protected FileStatus createFileStatusUnchecked(FileSlice fileSlice, HiveHoodieTableFileIndex fileIndex, Option virtualKeyInfoOpt) { Option baseFileOpt = fileSlice.getBaseFile(); Option latestLogFileOpt = fileSlice.getLatestLogFile(); Stream logFiles = fileSlice.getLogFiles(); + Option latestCompletedInstantOpt = fileIndex.getLatestCompletedInstant(); + String tableBasePath = fileIndex.getBasePath().toString(); + // Check if we're reading a MOR table if (baseFileOpt.isPresent()) { return createRealtimeFileStatusUnchecked(baseFileOpt.get(), logFiles, tableBasePath, latestCompletedInstantOpt, virtualKeyInfoOpt); diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java index eeeedc061e68..f9c2c9ca29be 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java @@ -449,7 +449,7 @@ public static HoodieMetadataConfig buildMetadataConfig(Configuration conf) { * @param dataFile * @return */ - public static HoodieBaseFile refreshFileStatus(Configuration conf, HoodieBaseFile dataFile) { + private static HoodieBaseFile refreshFileStatus(Configuration conf, HoodieBaseFile dataFile) { Path dataPath = dataFile.getFileStatus().getPath(); try { if (dataFile.getFileSize() == 0) {