Skip to content

Commit

Permalink
[core] Refactor TagManager to remove unnecessary tag existence check
Browse files Browse the repository at this point in the history
  • Loading branch information
yuzelin committed Jan 3, 2025
1 parent 00cf16e commit ba08428
Show file tree
Hide file tree
Showing 31 changed files with 189 additions and 182 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.utils;

import java.io.IOException;
import java.util.function.Supplier;

/** A {@link Supplier} with {@link IOException}. */
@FunctionalInterface
public interface SupplierWithIOException<T> {

T get() throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.SerializableConsumer;
import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.utils.SupplierWithIOException;
import org.apache.paimon.utils.TagManager;

import org.slf4j.Logger;
Expand Down Expand Up @@ -322,13 +323,13 @@ protected List<FileStatus> tryBestListingDirs(Path dir) {
* {@link FileNotFoundException}, return default value. Finally, if retry times reaches the
* limits, rethrow the IOException.
*/
protected static <T> T retryReadingFiles(ReaderWithIOException<T> reader, T defaultValue)
protected static <T> T retryReadingFiles(SupplierWithIOException<T> reader, T defaultValue)
throws IOException {
int retryNumber = 0;
IOException caught = null;
while (retryNumber++ < READ_FILE_RETRY_NUM) {
try {
return reader.read();
return reader.get();
} catch (FileNotFoundException e) {
return defaultValue;
} catch (IOException e) {
Expand All @@ -349,13 +350,6 @@ protected boolean oldEnough(FileStatus status) {
return status.getModificationTime() < olderThanMillis;
}

/** A helper functional interface for method {@link #retryReadingFiles}. */
@FunctionalInterface
protected interface ReaderWithIOException<T> {

T read() throws IOException;
}

public static SerializableConsumer<Path> createFileCleaner(
Catalog catalog, @Nullable Boolean dryRun) {
SerializableConsumer<Path> fileCleaner;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -541,7 +541,7 @@ private Optional<TableSchema> travelToVersion(String version, Options options) {
}

private Optional<TableSchema> travelToTag(String tagName, Options options) {
return travelToSnapshot(tagManager().taggedSnapshot(tagName), options);
return travelToSnapshot(tagManager().getOrThrow(tagName).trimToSnapshot(), options);
}

private Optional<TableSchema> travelToSnapshot(long snapshotId, Options options) {
Expand Down Expand Up @@ -633,7 +633,9 @@ public void createTag(String tagName, Duration timeRetained) {
}

private void createTag(String tagName, Snapshot fromSnapshot, @Nullable Duration timeRetained) {
tagManager().createTag(fromSnapshot, tagName, timeRetained, store().createTagCallbacks());
tagManager()
.createTag(
fromSnapshot, tagName, timeRetained, store().createTagCallbacks(), false);
}

@Override
Expand Down Expand Up @@ -689,7 +691,7 @@ public void rollbackTo(String tagName) {
TagManager tagManager = tagManager();
checkArgument(tagManager.tagExists(tagName), "Rollback tag '%s' doesn't exist.", tagName);

Snapshot taggedSnapshot = tagManager.taggedSnapshot(tagName);
Snapshot taggedSnapshot = tagManager.getOrThrow(tagName).trimToSnapshot();
rollbackHelper().cleanLargerThan(taggedSnapshot);

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,36 +25,27 @@
/** {@link StartingScanner} for incremental changes by tag. */
public class IncrementalTagStartingScanner extends AbstractStartingScanner {

private final String start;
private final String end;
private final Snapshot start;
private final Snapshot end;

public IncrementalTagStartingScanner(
SnapshotManager snapshotManager, String start, String end) {
SnapshotManager snapshotManager, String startTagName, String endTagName) {
super(snapshotManager);
this.start = start;
this.end = end;
TagManager tagManager =
new TagManager(snapshotManager.fileIO(), snapshotManager.tablePath());
Snapshot startingSnapshot = tagManager.taggedSnapshot(start);
if (startingSnapshot != null) {
this.startingSnapshotId = startingSnapshot.id();
}
}

@Override
public Result scan(SnapshotReader reader) {
TagManager tagManager =
new TagManager(snapshotManager.fileIO(), snapshotManager.tablePath());
Snapshot tag1 = tagManager.taggedSnapshot(start);
Snapshot tag2 = tagManager.taggedSnapshot(end);

if (tag2.id() <= tag1.id()) {
start = tagManager.getOrThrow(startTagName).trimToSnapshot();
end = tagManager.getOrThrow(endTagName).trimToSnapshot();
if (end.id() <= start.id()) {
throw new IllegalArgumentException(
String.format(
"Tag end %s with snapshot id %s should be larger than tag start %s with snapshot id %s",
end, tag2.id(), start, tag1.id()));
endTagName, end.id(), startTagName, start.id()));
}
this.startingSnapshotId = start.id();
}

return StartingScanner.fromPlan(reader.withSnapshot(tag2).readIncrementalDiff(tag1));
@Override
public Result scan(SnapshotReader reader) {
return StartingScanner.fromPlan(reader.withSnapshot(end).readIncrementalDiff(start));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public ScanMode startingScanMode() {
public SnapshotReader configure(SnapshotReader snapshotReader) {
TagManager tagManager =
new TagManager(snapshotManager.fileIO(), snapshotManager.tablePath());
Snapshot snapshot = tagManager.taggedSnapshot(tagName);
Snapshot snapshot = tagManager.getOrThrow(tagName).trimToSnapshot();
return snapshotReader.withMode(ScanMode.ALL).withSnapshot(snapshot);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,6 @@ private static Snapshot resolveSnapshotByTagName(
String tagName = options.scanTagName();
TagManager tagManager =
new TagManager(snapshotManager.fileIO(), snapshotManager.tablePath());
return tagManager.taggedSnapshot(tagName);
return tagManager.getOrThrow(tagName).trimToSnapshot();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@

import javax.annotation.Nullable;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
Expand Down Expand Up @@ -227,27 +228,25 @@ public RecordReader<InternalRow> createReader(Split split) {
&& ((LeafPredicate) predicate).literals().get(0) instanceof BinaryString
&& predicate.visit(LeafPredicateExtractor.INSTANCE).get(TAG_NAME) != null) {
String equalValue = ((LeafPredicate) predicate).literals().get(0).toString();
if (tagManager.tagExists(equalValue)) {
predicateMap.put(equalValue, tagManager.tag(equalValue));
}
tagManager.get(equalValue).ifPresent(tag -> predicateMap.put(equalValue, tag));
}

if (predicate instanceof CompoundPredicate) {
CompoundPredicate compoundPredicate = (CompoundPredicate) predicate;
// optimize for IN filter
if ((compoundPredicate.function()) instanceof Or) {
List<String> tagNames = new ArrayList<>();
InPredicateVisitor.extractInElements(predicate, TAG_NAME)
.ifPresent(
leafs ->
leafs.forEach(
leaf -> {
String leftName = leaf.toString();
if (tagManager.tagExists(leftName)) {
predicateMap.put(
leftName,
tagManager.tag(leftName));
}
}));
e ->
e.stream()
.map(Object::toString)
.forEach(tagNames::add));
tagNames.forEach(
name ->
tagManager
.get(name)
.ifPresent(value -> predicateMap.put(name, value)));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,9 +167,8 @@ private void tryToCreateTags(Snapshot snapshot) {
}
String tagName = periodHandler.timeToTag(thisTag);
LOG.info("The tag name is {}.", tagName);
if (!tagManager.tagExists(tagName)) {
tagManager.createTag(snapshot, tagName, defaultTimeRetained, callbacks);
}
// shouldn't throw exception when tag exists
tagManager.createTag(snapshot, tagName, defaultTimeRetained, callbacks, true);
nextTag = periodHandler.nextTagTime(thisTag);
LOG.info("The next tag time after this is {}.", nextTag);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,7 @@ branchName, branchPath(tablePath, branchName)),

public void createBranch(String branchName, String tagName) {
validateBranch(branchName);
checkArgument(tagManager.tagExists(tagName), "Tag name '%s' not exists.", tagName);

Snapshot snapshot = tagManager.taggedSnapshot(tagName);
Snapshot snapshot = tagManager.getOrThrow(tagName).trimToSnapshot();

try {
// Copy the corresponding tag, snapshot and schema files into the branch directory
Expand Down
87 changes: 49 additions & 38 deletions paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.util.Collection;
import java.util.Comparator;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
Expand Down Expand Up @@ -97,31 +98,34 @@ public List<Path> tagPaths(Predicate<Path> predicate) throws IOException {

/** Create a tag from given snapshot and save it in the storage. */
public void createTag(
Snapshot snapshot, String tagName, Duration timeRetained, List<TagCallback> callbacks) {
checkArgument(
!StringUtils.isNullOrWhitespaceOnly(tagName), "Tag name '%s' is blank.", tagName);
checkArgument(!tagExists(tagName), "Tag name '%s' already exists.", tagName);
Snapshot snapshot,
String tagName,
Duration timeRetained,
List<TagCallback> callbacks,
boolean ignoreIfExists) {
checkArgument(!StringUtils.isNullOrWhitespaceOnly(tagName), "Tag name shouldn't be blank.");
if (tagExists(tagName)) {
checkArgument(ignoreIfExists, "Tag '%s' already exists.", tagName);
return;
}
createOrReplaceTag(snapshot, tagName, timeRetained, callbacks);
}

/** Replace a tag from given snapshot and save it in the storage. */
public void replaceTag(Snapshot snapshot, String tagName, Duration timeRetained) {
checkArgument(
!StringUtils.isNullOrWhitespaceOnly(tagName), "Tag name '%s' is blank.", tagName);
checkArgument(tagExists(tagName), "Tag name '%s' does not exist.", tagName);
checkArgument(!StringUtils.isNullOrWhitespaceOnly(tagName), "Tag name shouldn't be blank.");
checkArgument(tagExists(tagName), "Tag '%s' doesn't exist.", tagName);
createOrReplaceTag(snapshot, tagName, timeRetained, null);
}

public void createOrReplaceTag(
private void createOrReplaceTag(
Snapshot snapshot,
String tagName,
@Nullable Duration timeRetained,
@Nullable List<TagCallback> callbacks) {
// When timeRetained is not defined, please do not write the tagCreatorTime field,
// as this will cause older versions (<= 0.7) of readers to be unable to read this
// tag.
// When timeRetained is defined, it is fine, because timeRetained is the new
// feature.
// When timeRetained is not defined, please do not write the tagCreatorTime field, as this
// will cause older versions (<= 0.7) of readers to be unable to read this tag.
// When timeRetained is defined, it is fine, because timeRetained is the new feature.
String content =
timeRetained != null
? Tag.fromSnapshotAndTagTtl(snapshot, timeRetained, LocalDateTime.now())
Expand Down Expand Up @@ -152,17 +156,17 @@ public void createOrReplaceTag(
}

public void renameTag(String tagName, String targetTagName) {
checkArgument(
!StringUtils.isNullOrWhitespaceOnly(tagName),
"Original tag name shouldn't be blank.");
checkArgument(tagExists(tagName), "Tag '%s' doesn't exist.", tagName);

checkArgument(
!StringUtils.isNullOrWhitespaceOnly(targetTagName),
"New tag name shouldn't be blank.");
checkArgument(!tagExists(targetTagName), "Tag '%s' already exists.", tagName);

try {
if (!tagExists(tagName)) {
throw new RuntimeException(
String.format("The specified tag name [%s] does not exist.", tagName));
}
if (tagExists(targetTagName)) {
throw new RuntimeException(
String.format(
"The specified target tag name [%s] existed, please set a non-existent tag name.",
targetTagName));
}
fileIO.rename(tagPath(tagName), tagPath(targetTagName));
} catch (IOException e) {
throw new RuntimeException(e);
Expand All @@ -172,7 +176,7 @@ public void renameTag(String tagName, String targetTagName) {
/** Make sure the tagNames are ALL tags of one snapshot. */
public void deleteAllTagsOfOneSnapshot(
List<String> tagNames, TagDeletion tagDeletion, SnapshotManager snapshotManager) {
Snapshot taggedSnapshot = taggedSnapshot(tagNames.get(0));
Snapshot taggedSnapshot = getOrThrow(tagNames.get(0)).trimToSnapshot();
List<Snapshot> taggedSnapshots;

// skip file deletion if snapshot exists
Expand All @@ -188,19 +192,20 @@ public void deleteAllTagsOfOneSnapshot(
doClean(taggedSnapshot, taggedSnapshots, snapshotManager, tagDeletion);
}

/** Ignore errors if the tag doesn't exist. */
public void deleteTag(
String tagName,
TagDeletion tagDeletion,
SnapshotManager snapshotManager,
List<TagCallback> callbacks) {
checkArgument(
!StringUtils.isNullOrWhitespaceOnly(tagName), "Tag name '%s' is blank.", tagName);
if (!tagExists(tagName)) {
checkArgument(!StringUtils.isNullOrWhitespaceOnly(tagName), "Tag name shouldn't be blank.");
Optional<Tag> tag = get(tagName);
if (!tag.isPresent()) {
LOG.warn("Tag '{}' doesn't exist.", tagName);
return;
}

Snapshot taggedSnapshot = taggedSnapshot(tagName);
Snapshot taggedSnapshot = tag.get().trimToSnapshot();
List<Snapshot> taggedSnapshots;

// skip file deletion if snapshot exists
Expand Down Expand Up @@ -303,10 +308,21 @@ public boolean tagExists(String tagName) {
}
}

/** Get the tagged snapshot by name. */
public Snapshot taggedSnapshot(String tagName) {
checkArgument(tagExists(tagName), "Tag '%s' doesn't exist.", tagName);
return Tag.fromPath(fileIO, tagPath(tagName)).trimToSnapshot();
/** Return the tag or Optional.empty() if the tag file not found. */
public Optional<Tag> get(String tagName) {
checkArgument(!StringUtils.isNullOrWhitespaceOnly(tagName), "Tag name shouldn't be blank.");
try {
return Optional.of(Tag.tryFromPath(fileIO, tagPath(tagName)));
} catch (FileNotFoundException e) {
return Optional.empty();
}
}

/** Return the tag or throw exception indicating the tag not found. */
public Tag getOrThrow(String tagName) {
return get(tagName)
.orElseThrow(
() -> new IllegalArgumentException("Tag '" + tagName + "' doesn't exist."));
}

public long tagCount() {
Expand Down Expand Up @@ -410,12 +426,7 @@ private int findIndex(Snapshot taggedSnapshot, List<Snapshot> taggedSnapshots) {
}
throw new RuntimeException(
String.format(
"Didn't find tag with snapshot id '%s'.This is unexpected.",
"Didn't find tag with snapshot id '%s'. This is unexpected.",
taggedSnapshot.id()));
}

/** Read tag for tagName. */
public Tag tag(String tagName) {
return Tag.fromPath(fileIO, tagPath(tagName));
}
}
Loading

0 comments on commit ba08428

Please sign in to comment.