Skip to content

Commit

Permalink
Core: Throw CommitStateUnknownException if RuntimeException that is n…
Browse files Browse the repository at this point in the history
…ot marked as cleanable is thrown
  • Loading branch information
amogh-jahagirdar committed Jun 17, 2024
1 parent 4859b55 commit 244f3b6
Show file tree
Hide file tree
Showing 6 changed files with 48 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.encryption.EncryptionManager;
import org.apache.iceberg.exceptions.CleanableFailure;
import org.apache.iceberg.exceptions.CommitStateUnknownException;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
Expand Down Expand Up @@ -308,12 +309,15 @@ private void replaceDataFiles(
LOG.warn("Commit state unknown, cannot clean up files that may have been committed", e);
throw e;
} catch (Exception e) {
LOG.warn("Failed to commit rewrite, cleaning up rewritten files", e);
Tasks.foreach(Iterables.transform(addedDataFiles, f -> f.path().toString()))
.noRetry()
.suppressFailureWhenFinished()
.onFailure((location, exc) -> LOG.warn("Failed to delete: {}", location, exc))
.run(fileIO::deleteFile);
if (e instanceof CleanableFailure) {
LOG.warn("Failed to commit rewrite, cleaning up rewritten files", e);
Tasks.foreach(Iterables.transform(addedDataFiles, f -> f.path().toString()))
.noRetry()
.suppressFailureWhenFinished()
.onFailure((location, exc) -> LOG.warn("Failed to delete: {}", location, exc))
.run(fileIO::deleteFile);
}

throw e;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.iceberg.DataFile;
import org.apache.iceberg.RewriteFiles;
import org.apache.iceberg.Table;
import org.apache.iceberg.exceptions.CleanableFailure;
import org.apache.iceberg.exceptions.CommitStateUnknownException;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
Expand Down Expand Up @@ -119,8 +120,12 @@ public void commitOrClean(Set<RewriteFileGroup> rewriteGroups) {
e);
throw e;
} catch (Exception e) {
LOG.error("Cannot commit groups {}, attempting to clean up written files", rewriteGroups, e);
rewriteGroups.forEach(this::abortFileGroup);
if (e instanceof CleanableFailure) {
LOG.error(
"Cannot commit groups {}, attempting to clean up written files", rewriteGroups, e);
rewriteGroups.forEach(this::abortFileGroup);
}

throw e;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.RewriteFiles;
import org.apache.iceberg.Table;
import org.apache.iceberg.exceptions.CleanableFailure;
import org.apache.iceberg.exceptions.CommitStateUnknownException;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
Expand Down Expand Up @@ -102,8 +103,12 @@ public void commitOrClean(Set<RewritePositionDeletesGroup> rewriteGroups) {
e);
throw e;
} catch (Exception e) {
LOG.error("Cannot commit groups {}, attempting to clean up written files", rewriteGroups, e);
rewriteGroups.forEach(this::abort);
if (e instanceof CleanableFailure) {
LOG.error(
"Cannot commit groups {}, attempting to clean up written files", rewriteGroups, e);
rewriteGroups.forEach(this::abort);
}

throw e;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.actions.ImmutableRewriteManifests;
import org.apache.iceberg.actions.RewriteManifests;
import org.apache.iceberg.exceptions.CleanableFailure;
import org.apache.iceberg.exceptions.CommitStateUnknownException;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.io.OutputFile;
Expand Down Expand Up @@ -354,10 +355,13 @@ private void replaceManifests(
} catch (CommitStateUnknownException commitStateUnknownException) {
// don't clean up added manifest files, because they may have been successfully committed.
throw commitStateUnknownException;
} catch (Exception e) {
// delete all new manifests because the rewrite failed
deleteFiles(Iterables.transform(addedManifests, ManifestFile::path));
throw e;
} catch (Throwable failure) {
if (failure instanceof CleanableFailure) {
// delete all new manifests because the rewrite failed
deleteFiles(Iterables.transform(addedManifests, ManifestFile::path));
}

throw failure;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
import org.apache.iceberg.Table;
import org.apache.iceberg.deletes.DeleteGranularity;
import org.apache.iceberg.deletes.PositionDelete;
import org.apache.iceberg.exceptions.CommitStateUnknownException;
import org.apache.iceberg.exceptions.CleanableFailure;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.io.BasePositionDeltaWriter;
Expand Down Expand Up @@ -105,7 +105,7 @@ class SparkPositionDeltaWrite implements DeltaWrite, RequiresDistributionAndOrde
private final Context context;
private final Map<String, String> writeProperties;

private boolean cleanupOnAbort = true;
private boolean cleanupOnAbort = false;

SparkPositionDeltaWrite(
SparkSession spark,
Expand Down Expand Up @@ -304,9 +304,12 @@ private void commitOperation(SnapshotUpdate<?> operation, String description) {
operation.commit(); // abort is automatically called if this fails
long duration = System.currentTimeMillis() - start;
LOG.info("Committed in {} ms", duration);
} catch (CommitStateUnknownException commitStateUnknownException) {
cleanupOnAbort = false;
throw commitStateUnknownException;
} catch (Throwable failure) {
if (failure instanceof CleanableFailure) {
cleanupOnAbort = true;
}

throw failure;
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
import org.apache.iceberg.SnapshotSummary;
import org.apache.iceberg.SnapshotUpdate;
import org.apache.iceberg.Table;
import org.apache.iceberg.exceptions.CommitStateUnknownException;
import org.apache.iceberg.exceptions.CleanableFailure;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.io.ClusteredDataWriter;
Expand Down Expand Up @@ -102,7 +102,7 @@ abstract class SparkWrite implements Write, RequiresDistributionAndOrdering {
private final SparkWriteRequirements writeRequirements;
private final Map<String, String> writeProperties;

private boolean cleanupOnAbort = true;
private boolean cleanupOnAbort = false;

SparkWrite(
SparkSession spark,
Expand Down Expand Up @@ -233,9 +233,12 @@ private void commitOperation(SnapshotUpdate<?> operation, String description) {
operation.commit(); // abort is automatically called if this fails
long duration = System.currentTimeMillis() - start;
LOG.info("Committed in {} ms", duration);
} catch (CommitStateUnknownException commitStateUnknownException) {
cleanupOnAbort = false;
throw commitStateUnknownException;
} catch (Throwable failure) {
if (failure instanceof CleanableFailure) {
cleanupOnAbort = true;
}

throw failure;
}
}

Expand Down

0 comments on commit 244f3b6

Please sign in to comment.