Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Fix race condition for earliest snapshot #4930

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions paimon-core/src/main/java/org/apache/paimon/Changelog.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

import javax.annotation.Nullable;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Map;

Expand Down Expand Up @@ -105,9 +106,19 @@ public static Changelog fromJson(String json) {
}

public static Changelog fromPath(FileIO fileIO, Path path) {
try {
return tryFromPath(fileIO, path);
} catch (FileNotFoundException e) {
throw new RuntimeException("Fails to read changelog from path " + path, e);
}
}

public static Changelog tryFromPath(FileIO fileIO, Path path) throws FileNotFoundException {
try {
String json = fileIO.readFileUtf8(path);
return Changelog.fromJson(json);
} catch (FileNotFoundException e) {
throw e;
} catch (IOException e) {
throw new RuntimeException("Fails to read changelog from path " + path, e);
}
Expand Down
102 changes: 90 additions & 12 deletions paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -163,11 +163,28 @@ public Snapshot tryGetSnapshot(long snapshotId) throws FileNotFoundException {
return snapshot;
}

private @Nullable Snapshot tryGetEarliestSnapshotLaterThanOrEqualTo(
long snapshotId, long stopSnapshotId) {
while (snapshotId <= stopSnapshotId) {
try {
return tryGetSnapshot(snapshotId);
} catch (FileNotFoundException e) {
snapshotId++;
}
}
return null;
}

public Changelog changelog(long snapshotId) {
Path changelogPath = longLivedChangelogPath(snapshotId);
return Changelog.fromPath(fileIO, changelogPath);
}

private Changelog tryGetChangelog(long snapshotId) throws FileNotFoundException {
Path changelogPath = longLivedChangelogPath(snapshotId);
return Changelog.tryFromPath(fileIO, changelogPath);
}

public Changelog longLivedChangelog(long snapshotId) {
return Changelog.fromPath(fileIO, longLivedChangelogPath(snapshotId));
}
Expand Down Expand Up @@ -209,7 +226,27 @@ public boolean longLivedChangelogExists(long snapshotId) {

public @Nullable Snapshot earliestSnapshot() {
Long snapshotId = earliestSnapshotId();
return snapshotId == null ? null : snapshot(snapshotId);
if (snapshotId == null) {
return null;
}

Long latestSnapshotId = null;
do {
try {
return tryGetSnapshot(snapshotId);
} catch (FileNotFoundException e) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a log here?

LOG.warn(
"The earliest snapshot was once identified but disappeared. "
+ "It might have been expired by other jobs operating on this table. "
+ "Searching for the second earliest snapshot instead. ");
snapshotId++;
if (latestSnapshotId == null) {
latestSnapshotId = latestSnapshotId();
}
}
} while (latestSnapshotId != null && snapshotId <= latestSnapshotId);

return null;
}

public @Nullable Long earliestSnapshotId() {
Expand Down Expand Up @@ -268,25 +305,47 @@ private Snapshot changelogOrSnapshot(long snapshotId) {
}
}

private Snapshot tryGetChangelogOrSnapshot(long snapshotId) throws FileNotFoundException {
if (longLivedChangelogExists(snapshotId)) {
return tryGetChangelog(snapshotId);
} else {
return tryGetSnapshot(snapshotId);
}
}

/**
* Returns the latest snapshot earlier than the timestamp mills. A non-existent snapshot may be
* returned if all snapshots are equal to or later than the timestamp mills.
*/
public @Nullable Long earlierThanTimeMills(long timestampMills, boolean startFromChangelog) {
Long earliestSnapshot = earliestSnapshotId();
Long earliestSnapshotId = earliestSnapshotId();
Long earliest;
if (startFromChangelog) {
Long earliestChangelog = earliestLongLivedChangelogId();
earliest = earliestChangelog == null ? earliestSnapshot : earliestChangelog;
Long earliestChangelogId = earliestLongLivedChangelogId();
earliest = earliestChangelogId == null ? earliestSnapshotId : earliestChangelogId;
} else {
earliest = earliestSnapshot;
earliest = earliestSnapshotId;
}
Long latest = latestSnapshotId();
if (earliest == null || latest == null) {
return null;
}

if (changelogOrSnapshot(earliest).timeMillis() >= timestampMills) {
Snapshot earliestSnapshot = null;
while (earliest <= latest) {
try {
earliestSnapshot = tryGetChangelogOrSnapshot(earliest);
break;
} catch (FileNotFoundException e) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a log here?

LOG.warn(
"The earliest snapshot or changelog was once identified but disappeared. "
+ "It might have been expired by other jobs operating on this table. "
+ "Searching for the second earliest snapshot or changelog instead. ");
earliest++;
}
}

if (earliestSnapshot == null || earliestSnapshot.timeMillis() >= timestampMills) {
return earliest - 1;
}

Expand All @@ -312,8 +371,9 @@ private Snapshot changelogOrSnapshot(long snapshotId) {
return null;
}

Snapshot earliestSnapShot = snapshot(earliest);
if (earliestSnapShot.timeMillis() > timestampMills) {
Snapshot earliestSnapShot = tryGetEarliestSnapshotLaterThanOrEqualTo(earliest, latest);

if (earliestSnapShot == null || earliestSnapShot.timeMillis() > timestampMills) {
return earliestSnapShot;
}
Snapshot finalSnapshot = null;
Expand Down Expand Up @@ -375,12 +435,23 @@ private Snapshot changelogOrSnapshot(long snapshotId) {
if (earliest == null || latest == null || snapshot(latest).watermark() == Long.MIN_VALUE) {
return null;
}

Snapshot earliestSnapShot = tryGetEarliestSnapshotLaterThanOrEqualTo(earliest, latest);

if (earliestSnapShot == null) {
return null;
}

Long earliestWatermark = null;
// find the first snapshot with watermark
if ((earliestWatermark = snapshot(earliest).watermark()) == null) {
if ((earliestWatermark = earliestSnapShot.watermark()) == null) {
while (earliest < latest) {
earliest++;
earliestWatermark = snapshot(earliest).watermark();
Snapshot snapshot = tryGetEarliestSnapshotLaterThanOrEqualTo(earliest, latest);
if (snapshot == null) {
continue;
}
earliestWatermark = snapshot.watermark();
if (earliestWatermark != null) {
break;
}
Expand All @@ -391,7 +462,7 @@ private Snapshot changelogOrSnapshot(long snapshotId) {
}

if (earliestWatermark >= watermark) {
return snapshot(earliest);
return tryGetEarliestSnapshotLaterThanOrEqualTo(earliest, latest);
}
Snapshot finalSnapshot = null;

Expand Down Expand Up @@ -434,9 +505,16 @@ private Snapshot changelogOrSnapshot(long snapshotId) {
if (earliest == null || latest == null || snapshot(latest).watermark() == Long.MIN_VALUE) {
return null;
}

Snapshot earliestSnapShot = tryGetEarliestSnapshotLaterThanOrEqualTo(earliest, latest);

if (earliestSnapShot == null) {
return null;
}

Long earliestWatermark = null;
// find the first snapshot with watermark
if ((earliestWatermark = snapshot(earliest).watermark()) == null) {
if ((earliestWatermark = earliestSnapShot.watermark()) == null) {
while (earliest < latest) {
earliest++;
earliestWatermark = snapshot(earliest).watermark();
Expand Down
Loading
Loading