Skip to content

Commit

Permalink
[HUDI-7041] Optimize the memory usage of timeline server for table se…
Browse files Browse the repository at this point in the history
…rvice (apache#10002)
  • Loading branch information
zhuanshenbsj1 authored Nov 26, 2023
1 parent 4f875ed commit 499423c
Show file tree
Hide file tree
Showing 14 changed files with 351 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -118,17 +120,23 @@ HoodieCleanerPlan requestClean(HoodieEngineContext context) {

context.setJobStatus(this.getClass().getSimpleName(), "Generating list of file slices to be cleaned: " + config.getTableName());

Map<String, Pair<Boolean, List<CleanFileInfo>>> cleanOpsWithPartitionMeta = context
.map(partitionsToClean, partitionPathToClean -> Pair.of(partitionPathToClean, planner.getDeletePaths(partitionPathToClean, earliestInstant)), cleanerParallelism)
.stream()
.collect(Collectors.toMap(Pair::getKey, Pair::getValue));

Map<String, List<HoodieCleanFileInfo>> cleanOps = cleanOpsWithPartitionMeta.entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey,
e -> CleanerUtils.convertToHoodieCleanFileInfoList(e.getValue().getValue())));

List<String> partitionsToDelete = cleanOpsWithPartitionMeta.entrySet().stream().filter(entry -> entry.getValue().getKey()).map(Map.Entry::getKey)
.collect(Collectors.toList());
Map<String, List<HoodieCleanFileInfo>> cleanOps = new HashMap<>();
List<String> partitionsToDelete = new ArrayList<>();
for (int i = 0; i < partitionsToClean.size(); i += cleanerParallelism) {
// Handles at most 'cleanerParallelism' number of partitions once at a time to avoid overlarge memory pressure to the timeline server
// (remote or local embedded), thus to reduce the risk of an OOM exception.
List<String> subPartitionsToClean = partitionsToClean.subList(i, Math.min(i + cleanerParallelism, partitionsToClean.size()));
Map<String, Pair<Boolean, List<CleanFileInfo>>> cleanOpsWithPartitionMeta = context
.map(subPartitionsToClean, partitionPathToClean -> Pair.of(partitionPathToClean, planner.getDeletePaths(partitionPathToClean, earliestInstant)), cleanerParallelism)
.stream()
.collect(Collectors.toMap(Pair::getKey, Pair::getValue));

cleanOps.putAll(cleanOpsWithPartitionMeta.entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, e -> CleanerUtils.convertToHoodieCleanFileInfoList(e.getValue().getValue()))));

partitionsToDelete.addAll(cleanOpsWithPartitionMeta.entrySet().stream().filter(entry -> entry.getValue().getKey()).map(Map.Entry::getKey)
.collect(Collectors.toList()));
}

return new HoodieCleanerPlan(earliestInstant
.map(x -> new HoodieActionInstant(x.getTimestamp(), x.getAction(), x.getState().name())).orElse(null),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ private Pair<Boolean, List<CleanFileInfo>> getFilesToCleanKeepingLatestVersions(
// In other words, the file versions only apply to the active file groups.
deletePaths.addAll(getReplacedFilesEligibleToClean(savepointedFiles, partitionPath, Option.empty()));
boolean toDeletePartition = false;
List<HoodieFileGroup> fileGroups = fileSystemView.getAllFileGroups(partitionPath).collect(Collectors.toList());
List<HoodieFileGroup> fileGroups = fileSystemView.getAllFileGroupsStateless(partitionPath).collect(Collectors.toList());
for (HoodieFileGroup fileGroup : fileGroups) {
int keepVersions = config.getCleanerFileVersionsRetained();
// do not cleanup slice required for pending compaction
Expand Down Expand Up @@ -329,7 +329,7 @@ private Pair<Boolean, List<CleanFileInfo>> getFilesToCleanKeepingLatestCommits(S
// all replaced file groups before earliestCommitToRetain are eligible to clean
deletePaths.addAll(getReplacedFilesEligibleToClean(savepointedFiles, partitionPath, earliestCommitToRetain));
// add active files
List<HoodieFileGroup> fileGroups = fileSystemView.getAllFileGroups(partitionPath).collect(Collectors.toList());
List<HoodieFileGroup> fileGroups = fileSystemView.getAllFileGroupsStateless(partitionPath).collect(Collectors.toList());
for (HoodieFileGroup fileGroup : fileGroups) {
List<FileSlice> fileSliceList = fileGroup.getAllFileSlices().collect(Collectors.toList());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ protected Stream<FileSlice> getFileSlicesEligibleForClustering(String partition)
.collect(Collectors.toSet());
fgIdsInPendingCompactionLogCompactionAndClustering.addAll(fileSystemView.getFileGroupsInPendingClustering().map(Pair::getKey).collect(Collectors.toSet()));

return hoodieTable.getSliceView().getLatestFileSlices(partition)
return hoodieTable.getSliceView().getLatestFileSlicesStateless(partition)
// file ids already in clustering are not eligible
.filter(slice -> !fgIdsInPendingCompactionLogCompactionAndClustering.contains(slice.getFileGroupId()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ public HoodieCompactionPlan generateCompactionPlan(String compactionInstant) thr
Option<InstantRange> instantRange = CompactHelpers.getInstance().getInstantRange(metaClient);

List<HoodieCompactionOperation> operations = engineContext.flatMap(partitionPaths, partitionPath -> fileSystemView
.getLatestFileSlices(partitionPath)
.getLatestFileSlicesStateless(partitionPath)
.filter(slice -> filterFileSlice(slice, lastCompletedInstantTime, fgIdsInPendingCompactionAndClustering, instantRange))
.map(s -> {
List<HoodieLogFile> logFiles = s.getLogFiles()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -433,6 +433,19 @@ protected Map<Pair<String, Path>, FileStatus[]> listPartitions(
return fileStatusMap;
}

/**
* Returns all files situated at the given partition.
*/
private FileStatus[] getAllFilesInPartition(String relativePartitionPath) throws IOException {
Path partitionPath = FSUtils.getPartitionPath(metaClient.getBasePathV2(), relativePartitionPath);
long beginLsTs = System.currentTimeMillis();
FileStatus[] statuses = listPartition(partitionPath);
long endLsTs = System.currentTimeMillis();
LOG.debug("#files found in partition (" + relativePartitionPath + ") =" + statuses.length + ", Time taken ="
+ (endLsTs - beginLsTs));
return statuses;
}

/**
* Allows lazily loading the partitions if needed.
*
Expand All @@ -449,15 +462,7 @@ private void ensurePartitionLoadedCorrectly(String partition) {
// Not loaded yet
try {
LOG.info("Building file system view for partition (" + partitionPathStr + ")");

Path partitionPath = FSUtils.getPartitionPath(metaClient.getBasePathV2(), partitionPathStr);
long beginLsTs = System.currentTimeMillis();
FileStatus[] statuses = listPartition(partitionPath);
long endLsTs = System.currentTimeMillis();
LOG.debug("#files found in partition (" + partitionPathStr + ") =" + statuses.length + ", Time taken ="
+ (endLsTs - beginLsTs));
List<HoodieFileGroup> groups = addFilesToView(statuses);

List<HoodieFileGroup> groups = addFilesToView(getAllFilesInPartition(partitionPathStr));
if (groups.isEmpty()) {
storePartitionView(partitionPathStr, new ArrayList<>());
}
Expand Down Expand Up @@ -598,35 +603,49 @@ private FileSlice filterUncommittedLogs(FileSlice fileSlice) {
}

protected HoodieFileGroup addBootstrapBaseFileIfPresent(HoodieFileGroup fileGroup) {
return addBootstrapBaseFileIfPresent(fileGroup, this::getBootstrapBaseFile);
}

protected HoodieFileGroup addBootstrapBaseFileIfPresent(HoodieFileGroup fileGroup, Function<HoodieFileGroupId, Option<BootstrapBaseFileMapping>> bootstrapBaseFileMappingFunc) {
boolean hasBootstrapBaseFile = fileGroup.getAllFileSlices()
.anyMatch(fs -> fs.getBaseInstantTime().equals(METADATA_BOOTSTRAP_INSTANT_TS));
if (hasBootstrapBaseFile) {
HoodieFileGroup newFileGroup = new HoodieFileGroup(fileGroup);
newFileGroup.getAllFileSlices().filter(fs -> fs.getBaseInstantTime().equals(METADATA_BOOTSTRAP_INSTANT_TS))
.forEach(fs -> fs.setBaseFile(
addBootstrapBaseFileIfPresent(fs.getFileGroupId(), fs.getBaseFile().get())));
addBootstrapBaseFileIfPresent(fs.getFileGroupId(), fs.getBaseFile().get(), bootstrapBaseFileMappingFunc)));
return newFileGroup;
}
return fileGroup;
}

protected FileSlice addBootstrapBaseFileIfPresent(FileSlice fileSlice) {
return addBootstrapBaseFileIfPresent(fileSlice, this::getBootstrapBaseFile);
}

protected FileSlice addBootstrapBaseFileIfPresent(FileSlice fileSlice, Function<HoodieFileGroupId, Option<BootstrapBaseFileMapping>> bootstrapBaseFileMappingFunc) {
if (fileSlice.getBaseInstantTime().equals(METADATA_BOOTSTRAP_INSTANT_TS)) {
FileSlice copy = new FileSlice(fileSlice);
copy.getBaseFile().ifPresent(dataFile -> {
Option<BootstrapBaseFileMapping> edf = getBootstrapBaseFile(copy.getFileGroupId());
edf.ifPresent(e -> dataFile.setBootstrapBaseFile(e.getBootstrapBaseFile()));
bootstrapBaseFileMappingFunc.apply(copy.getFileGroupId()).ifPresent(e -> dataFile.setBootstrapBaseFile(e.getBootstrapBaseFile()));
});
return copy;
}
return fileSlice;
}

protected HoodieBaseFile addBootstrapBaseFileIfPresent(HoodieFileGroupId fileGroupId, HoodieBaseFile baseFile) {
return addBootstrapBaseFileIfPresent(fileGroupId, baseFile, this::getBootstrapBaseFile);
}

protected HoodieBaseFile addBootstrapBaseFileIfPresent(
HoodieFileGroupId fileGroupId,
HoodieBaseFile baseFile,
Function<HoodieFileGroupId, Option<BootstrapBaseFileMapping>> bootstrapBaseFileMappingFunc) {
if (baseFile.getCommitTime().equals(METADATA_BOOTSTRAP_INSTANT_TS)) {
HoodieBaseFile copy = new HoodieBaseFile(baseFile);
Option<BootstrapBaseFileMapping> edf = getBootstrapBaseFile(fileGroupId);
edf.ifPresent(e -> copy.setBootstrapBaseFile(e.getBootstrapBaseFile()));
bootstrapBaseFileMappingFunc.apply(fileGroupId).ifPresent(e -> copy.setBootstrapBaseFile(e.getBootstrapBaseFile()));
return copy;
}
return baseFile;
Expand Down Expand Up @@ -706,7 +725,6 @@ public final Stream<HoodieBaseFile> getLatestBaseFilesBeforeOrOn(String partitio
public final Map<String, Stream<HoodieBaseFile>> getAllLatestBaseFilesBeforeOrOn(String maxCommitTime) {
try {
readLock.lock();

List<String> formattedPartitionList = ensureAllPartitionsLoadedCorrectly();
return formattedPartitionList.stream().collect(Collectors.toMap(
Function.identity(),
Expand Down Expand Up @@ -824,6 +842,31 @@ public final Stream<FileSlice> getLatestFileSlices(String partitionStr) {
}
}

@Override
public final Stream<FileSlice> getLatestFileSlicesStateless(String partitionStr) {
String partition = formatPartitionKey(partitionStr);
if (isPartitionAvailableInStore(partition)) {
return getLatestFileSlices(partition);
} else {
try {
Stream<FileSlice> fileSliceStream = buildFileGroups(getAllFilesInPartition(partition), visibleCommitsAndCompactionTimeline, true).stream()
.filter(fg -> !isFileGroupReplaced(fg))
.map(HoodieFileGroup::getLatestFileSlice)
.filter(Option::isPresent).map(Option::get)
.flatMap(slice -> this.filterUncommittedFiles(slice, true));
if (bootstrapIndex.useIndex()) {
final Map<HoodieFileGroupId, BootstrapBaseFileMapping> bootstrapBaseFileMappings = getBootstrapBaseFileMappings(partition);
if (!bootstrapBaseFileMappings.isEmpty()) {
return fileSliceStream.map(fileSlice -> addBootstrapBaseFileIfPresent(fileSlice, fileGroupId -> Option.ofNullable(bootstrapBaseFileMappings.get(fileGroupId))));
}
}
return fileSliceStream;
} catch (IOException e) {
throw new HoodieIOException("Failed to fetch all files in partition " + partition, e);
}
}
}

/**
* Get Latest File Slice for a given fileId in a given partition.
*/
Expand Down Expand Up @@ -1014,6 +1057,39 @@ public final Stream<HoodieFileGroup> getAllFileGroups(String partitionStr) {
return getAllFileGroupsIncludingReplaced(partitionStr).filter(fg -> !isFileGroupReplaced(fg));
}

@Override
public final Stream<HoodieFileGroup> getAllFileGroupsStateless(String partitionStr) {
String partition = formatPartitionKey(partitionStr);
if (isPartitionAvailableInStore(partition)) {
return getAllFileGroups(partition);
} else {
try {
Stream<HoodieFileGroup> fileGroupStream = buildFileGroups(getAllFilesInPartition(partition), visibleCommitsAndCompactionTimeline, true).stream()
.filter(fg -> !isFileGroupReplaced(fg));
if (bootstrapIndex.useIndex()) {
final Map<HoodieFileGroupId, BootstrapBaseFileMapping> bootstrapBaseFileMappings = getBootstrapBaseFileMappings(partition);
if (!bootstrapBaseFileMappings.isEmpty()) {
return fileGroupStream.map(fileGroup -> addBootstrapBaseFileIfPresent(fileGroup, fileGroupId -> Option.ofNullable(bootstrapBaseFileMappings.get(fileGroupId))));
}
}
return fileGroupStream;
} catch (IOException e) {
throw new HoodieIOException("Failed to fetch all files in partition " + partition, e);
}
}
}

private Map<HoodieFileGroupId, BootstrapBaseFileMapping> getBootstrapBaseFileMappings(String partition) {
try (BootstrapIndex.IndexReader reader = bootstrapIndex.createReader()) {
LOG.info("Bootstrap Index available for partition " + partition);
List<BootstrapFileMapping> sourceFileMappings =
reader.getSourceFileMappingForPartition(partition);
return sourceFileMappings.stream()
.map(s -> new BootstrapBaseFileMapping(new HoodieFileGroupId(s.getPartitionPath(),
s.getFileId()), s.getBootstrapFileStatus())).collect(Collectors.toMap(BootstrapBaseFileMapping::getFileGroupId, s -> s));
}
}

private Stream<HoodieFileGroup> getAllFileGroupsIncludingReplaced(final String partitionStr) {
try {
readLock.lock();
Expand All @@ -1029,22 +1105,38 @@ private Stream<HoodieFileGroup> getAllFileGroupsIncludingReplaced(final String p

@Override
public Stream<HoodieFileGroup> getReplacedFileGroupsBeforeOrOn(String maxCommitTime, String partitionPath) {
return getAllFileGroupsIncludingReplaced(partitionPath).filter(fg -> isFileGroupReplacedBeforeOrOn(fg.getFileGroupId(), maxCommitTime));
String partition = formatPartitionKey(partitionPath);
if (hasReplacedFilesInPartition(partition)) {
return getAllFileGroupsIncludingReplaced(partition).filter(fg -> isFileGroupReplacedBeforeOrOn(fg.getFileGroupId(), maxCommitTime));
}
return Stream.empty();
}

@Override
public Stream<HoodieFileGroup> getReplacedFileGroupsBefore(String maxCommitTime, String partitionPath) {
return getAllFileGroupsIncludingReplaced(partitionPath).filter(fg -> isFileGroupReplacedBefore(fg.getFileGroupId(), maxCommitTime));
String partition = formatPartitionKey(partitionPath);
if (hasReplacedFilesInPartition(partition)) {
return getAllFileGroupsIncludingReplaced(partition).filter(fg -> isFileGroupReplacedBefore(fg.getFileGroupId(), maxCommitTime));
}
return Stream.empty();
}

@Override
public Stream<HoodieFileGroup> getReplacedFileGroupsAfterOrOn(String minCommitTime, String partitionPath) {
return getAllFileGroupsIncludingReplaced(partitionPath).filter(fg -> isFileGroupReplacedAfterOrOn(fg.getFileGroupId(), minCommitTime));
String partition = formatPartitionKey(partitionPath);
if (hasReplacedFilesInPartition(partition)) {
return getAllFileGroupsIncludingReplaced(partition).filter(fg -> isFileGroupReplacedAfterOrOn(fg.getFileGroupId(), minCommitTime));
}
return Stream.empty();
}

@Override
public Stream<HoodieFileGroup> getAllReplacedFileGroups(String partitionPath) {
return getAllFileGroupsIncludingReplaced(partitionPath).filter(fg -> isFileGroupReplaced(fg.getFileGroupId()));
String partition = formatPartitionKey(partitionPath);
if (hasReplacedFilesInPartition(partition)) {
return getAllFileGroupsIncludingReplaced(partition).filter(fg -> isFileGroupReplaced(fg.getFileGroupId()));
}
return Stream.empty();
}

@Override
Expand Down Expand Up @@ -1263,6 +1355,11 @@ protected abstract Option<Pair<String, CompactionOperation>> getPendingLogCompac
*/
protected abstract void removeReplacedFileIdsAtInstants(Set<String> instants);

/**
* Returns whether there are replaced files within the given partition.
*/
protected abstract boolean hasReplacedFilesInPartition(String partitionPath);

/**
* Track instant time for file groups replaced.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,11 @@ protected void removeReplacedFileIdsAtInstants(Set<String> instants) {
fgIdToReplaceInstants.entrySet().removeIf(entry -> instants.contains(entry.getValue().getTimestamp()));
}

@Override
protected boolean hasReplacedFilesInPartition(String partitionPath) {
return fgIdToReplaceInstants.keySet().stream().anyMatch(fg -> fg.getPartitionPath().equals(partitionPath));
}

@Override
protected Option<HoodieInstant> getReplaceInstant(final HoodieFileGroupId fileGroupId) {
return Option.ofNullable(fgIdToReplaceInstants.get(fileGroupId));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,11 @@ public Stream<FileSlice> getLatestFileSlices(String partitionPath) {
return execute(partitionPath, preferredView::getLatestFileSlices, secondaryView::getLatestFileSlices);
}

@Override
public Stream<FileSlice> getLatestFileSlicesStateless(String partitionPath) {
return execute(partitionPath, preferredView::getLatestFileSlicesStateless, secondaryView::getLatestFileSlicesStateless);
}

@Override
public Stream<FileSlice> getLatestUnCompactedFileSlices(String partitionPath) {
return execute(partitionPath, preferredView::getLatestUnCompactedFileSlices,
Expand Down Expand Up @@ -222,6 +227,11 @@ public Stream<HoodieFileGroup> getAllFileGroups(String partitionPath) {
return execute(partitionPath, preferredView::getAllFileGroups, secondaryView::getAllFileGroups);
}

@Override
public Stream<HoodieFileGroup> getAllFileGroupsStateless(String partitionPath) {
return execute(partitionPath, preferredView::getAllFileGroupsStateless, secondaryView::getAllFileGroupsStateless);
}

@Override
public Stream<HoodieFileGroup> getReplacedFileGroupsBeforeOrOn(String maxCommitTime, String partitionPath) {
return execute(maxCommitTime, partitionPath, preferredView::getReplacedFileGroupsBeforeOrOn, secondaryView::getReplacedFileGroupsBeforeOrOn);
Expand Down
Loading

0 comments on commit 499423c

Please sign in to comment.