diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestDisruptorMessageQueue.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestDisruptorMessageQueue.java index 6588f6d63079..03ec64c33860 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestDisruptorMessageQueue.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestDisruptorMessageQueue.java @@ -92,7 +92,7 @@ private Runnable getPreExecuteRunnable() { @SuppressWarnings("unchecked") @Test @Timeout(value = 60) - public void testRecordReading() throws Exception { + public void testRecordReading() { final List hoodieRecords = dataGen.generateInserts(instantTime, 100); ArrayList beforeRecord = new ArrayList<>(); diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieCopyOnWriteTableInputFormat.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieCopyOnWriteTableInputFormat.java index 75504cdd132d..04b65d8878aa 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieCopyOnWriteTableInputFormat.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieCopyOnWriteTableInputFormat.java @@ -20,6 +20,7 @@ import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.engine.HoodieLocalEngineContext; +import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.FileSlice; import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodieLogFile; @@ -27,11 +28,13 @@ import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; -import org.apache.hudi.common.util.CollectionUtils; +import org.apache.hudi.common.table.view.FileSystemViewManager; +import org.apache.hudi.common.table.view.HoodieTableFileSystemView; import org.apache.hudi.common.util.Option; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.hadoop.utils.HoodieHiveUtils; import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils; +import org.apache.hudi.metadata.HoodieTableMetadataUtil; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -52,6 +55,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; @@ -59,7 +63,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static org.apache.hudi.common.util.ValidationUtils.checkState; +import static org.apache.hudi.common.config.HoodieMetadataConfig.ENABLE; /** * Base implementation of the Hive's {@link FileInputFormat} allowing for reading of Hudi's @@ -195,7 +199,8 @@ protected List listStatusForIncrementalMode(JobConf job, return HoodieInputFormatUtils.filterIncrementalFileStatus(jobContext, tableMetaClient, timeline.get(), fileStatuses, commitsToCheck.get()); } - protected FileStatus createFileStatusUnchecked(FileSlice fileSlice, HiveHoodieTableFileIndex fileIndex, HoodieTableMetaClient metaClient) { + protected FileStatus createFileStatusUnchecked(FileSlice fileSlice, Option latestCompletedInstantOpt, + String tableBasePath, HoodieTableMetaClient metaClient) { Option baseFileOpt = fileSlice.getBaseFile(); if (baseFileOpt.isPresent()) { @@ -241,31 +246,86 @@ private List listStatusForSnapshotMode(JobConf job, boolean shouldIncludePendingCommits = HoodieHiveUtils.shouldIncludePendingCommits(job, tableMetaClient.getTableConfig().getTableName()); - HiveHoodieTableFileIndex fileIndex = - new HiveHoodieTableFileIndex( - engineContext, - tableMetaClient, - props, - HoodieTableQueryType.SNAPSHOT, - partitionPaths, - queryCommitInstant, - shouldIncludePendingCommits); - - Map> partitionedFileSlices = fileIndex.listFileSlices(); - - targetFiles.addAll( - partitionedFileSlices.values() - .stream() - .flatMap(Collection::stream) - .filter(fileSlice -> checkIfValidFileSlice(fileSlice)) - .map(fileSlice -> createFileStatusUnchecked(fileSlice, fileIndex, tableMetaClient)) - .collect(Collectors.toList()) - ); + if (HoodieTableMetadataUtil.isFilesPartitionAvailable(tableMetaClient) || conf.getBoolean(ENABLE.key(), ENABLE.defaultValue())) { + HiveHoodieTableFileIndex fileIndex = + new HiveHoodieTableFileIndex( + engineContext, + tableMetaClient, + props, + HoodieTableQueryType.SNAPSHOT, + partitionPaths, + queryCommitInstant, + shouldIncludePendingCommits); + + Map> partitionedFileSlices = fileIndex.listFileSlices(); + + targetFiles.addAll( + partitionedFileSlices.values() + .stream() + .flatMap(Collection::stream) + .filter(fileSlice -> checkIfValidFileSlice(fileSlice)) + .map(fileSlice -> createFileStatusUnchecked(fileSlice, fileIndex.getLatestCompletedInstant(), + fileIndex.getBasePath().toString(), tableMetaClient)) + .collect(Collectors.toList()) + ); + } else { + // If hoodie.metadata.enabled is set to false and the table doesn't have the metadata, + // read the table using fs view cache instead of file index. + // This is because there's no file index in non-metadata table. + String basePath = tableMetaClient.getBasePathV2().toString(); + Map fsViewCache = new HashMap<>(); + HoodieTimeline timeline = getActiveTimeline(tableMetaClient, shouldIncludePendingCommits); + Option queryInstant = queryCommitInstant.or(() -> timeline.lastInstant().map(HoodieInstant::getTimestamp)); + validateInstant(timeline, queryInstant); + + try { + HoodieTableFileSystemView fsView = fsViewCache.computeIfAbsent(tableMetaClient, hoodieTableMetaClient -> + FileSystemViewManager.createInMemoryFileSystemViewWithTimeline(engineContext, hoodieTableMetaClient, + HoodieInputFormatUtils.buildMetadataConfig(job), timeline)); + + List filteredFileSlices = new ArrayList<>(); + + for (Path p : entry.getValue()) { + String relativePartitionPath = FSUtils.getRelativePartitionPath(new Path(basePath), p); + + List fileSlices = queryInstant.map( + instant -> fsView.getLatestMergedFileSlicesBeforeOrOn(relativePartitionPath, instant)) + .orElse(fsView.getLatestFileSlices(relativePartitionPath)) + .collect(Collectors.toList()); + + filteredFileSlices.addAll(fileSlices); + } + + targetFiles.addAll( + filteredFileSlices.stream() + .filter(fileSlice -> checkIfValidFileSlice(fileSlice)) + .map(fileSlice -> createFileStatusUnchecked(fileSlice, timeline.filterCompletedInstants().lastInstant(), + basePath, tableMetaClient)) + .collect(Collectors.toList())); + } finally { + fsViewCache.forEach(((metaClient, fsView) -> fsView.close())); + } + } } return targetFiles; } + private static HoodieTimeline getActiveTimeline(HoodieTableMetaClient metaClient, boolean shouldIncludePendingCommits) { + HoodieTimeline timeline = metaClient.getCommitsAndCompactionTimeline(); + if (shouldIncludePendingCommits) { + return timeline; + } else { + return timeline.filterCompletedAndCompactionInstants(); + } + } + + private static void validateInstant(HoodieTimeline activeTimeline, Option queryInstant) { + if (queryInstant.isPresent() && !activeTimeline.containsInstant(queryInstant.get())) { + throw new HoodieIOException(String.format("Query instant (%s) not found in the timeline", queryInstant.get())); + } + } + protected boolean checkIfValidFileSlice(FileSlice fileSlice) { Option baseFileOpt = fileSlice.getBaseFile(); Option latestLogFileOpt = fileSlice.getLatestLogFile(); @@ -280,11 +340,6 @@ protected boolean checkIfValidFileSlice(FileSlice fileSlice) { } } - private void validate(List targetFiles, List legacyFileStatuses) { - List diff = CollectionUtils.diff(targetFiles, legacyFileStatuses); - checkState(diff.isEmpty(), "Should be empty"); - } - @Nonnull protected static FileStatus getFileStatusUnchecked(HoodieBaseFile baseFile) { try { diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieMergeOnReadTableInputFormat.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieMergeOnReadTableInputFormat.java index 3719718e95aa..0cfe0d0a1940 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieMergeOnReadTableInputFormat.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieMergeOnReadTableInputFormat.java @@ -36,7 +36,6 @@ import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.hadoop.BootstrapBaseFileSplit; import org.apache.hudi.hadoop.FileStatusWithBootstrapBaseFile; -import org.apache.hudi.hadoop.HiveHoodieTableFileIndex; import org.apache.hudi.hadoop.HoodieCopyOnWriteTableInputFormat; import org.apache.hudi.hadoop.LocatedFileStatusWithBootstrapBaseFile; import org.apache.hudi.hadoop.RealtimeFileStatus; @@ -92,14 +91,12 @@ public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { } @Override - protected FileStatus createFileStatusUnchecked(FileSlice fileSlice, HiveHoodieTableFileIndex fileIndex, HoodieTableMetaClient metaClient) { + protected FileStatus createFileStatusUnchecked(FileSlice fileSlice, Option latestCompletedInstantOpt, + String tableBasePath, HoodieTableMetaClient metaClient) { Option baseFileOpt = fileSlice.getBaseFile(); Option latestLogFileOpt = fileSlice.getLatestLogFile(); Stream logFiles = fileSlice.getLogFiles(); - Option latestCompletedInstantOpt = fileIndex.getLatestCompletedInstant(); - String tableBasePath = fileIndex.getBasePath().toString(); - // Check if we're reading a MOR table if (baseFileOpt.isPresent()) { return createRealtimeFileStatusUnchecked(baseFileOpt.get(), logFiles, tableBasePath, latestCompletedInstantOpt, getHoodieVirtualKeyInfo(metaClient));