From 244f3b652e419e05007af6ceb49f6687b99a2b21 Mon Sep 17 00:00:00 2001 From: Amogh Jahagirdar Date: Fri, 24 May 2024 08:54:28 -0600 Subject: [PATCH] Core: Throw CommitStateUnknownException if RuntimeException that is not marked as cleanable is thrown --- .../actions/BaseRewriteDataFilesAction.java | 16 ++++++++++------ .../actions/RewriteDataFilesCommitManager.java | 9 +++++++-- .../RewritePositionDeletesCommitManager.java | 9 +++++++-- .../actions/RewriteManifestsSparkAction.java | 12 ++++++++---- .../spark/source/SparkPositionDeltaWrite.java | 13 ++++++++----- .../apache/iceberg/spark/source/SparkWrite.java | 13 ++++++++----- 6 files changed, 48 insertions(+), 24 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/actions/BaseRewriteDataFilesAction.java b/core/src/main/java/org/apache/iceberg/actions/BaseRewriteDataFilesAction.java index 5f229be579b7..c0f2fc6174b3 100644 --- a/core/src/main/java/org/apache/iceberg/actions/BaseRewriteDataFilesAction.java +++ b/core/src/main/java/org/apache/iceberg/actions/BaseRewriteDataFilesAction.java @@ -31,6 +31,7 @@ import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; import org.apache.iceberg.encryption.EncryptionManager; +import org.apache.iceberg.exceptions.CleanableFailure; import org.apache.iceberg.exceptions.CommitStateUnknownException; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.expressions.Expressions; @@ -308,12 +309,15 @@ private void replaceDataFiles( LOG.warn("Commit state unknown, cannot clean up files that may have been committed", e); throw e; } catch (Exception e) { - LOG.warn("Failed to commit rewrite, cleaning up rewritten files", e); - Tasks.foreach(Iterables.transform(addedDataFiles, f -> f.path().toString())) - .noRetry() - .suppressFailureWhenFinished() - .onFailure((location, exc) -> LOG.warn("Failed to delete: {}", location, exc)) - .run(fileIO::deleteFile); + if (e instanceof CleanableFailure) { + LOG.warn("Failed to commit rewrite, cleaning up rewritten files", e); + Tasks.foreach(Iterables.transform(addedDataFiles, f -> f.path().toString())) + .noRetry() + .suppressFailureWhenFinished() + .onFailure((location, exc) -> LOG.warn("Failed to delete: {}", location, exc)) + .run(fileIO::deleteFile); + } + throw e; } } diff --git a/core/src/main/java/org/apache/iceberg/actions/RewriteDataFilesCommitManager.java b/core/src/main/java/org/apache/iceberg/actions/RewriteDataFilesCommitManager.java index 7f89db467d73..45b4bcf0a4d9 100644 --- a/core/src/main/java/org/apache/iceberg/actions/RewriteDataFilesCommitManager.java +++ b/core/src/main/java/org/apache/iceberg/actions/RewriteDataFilesCommitManager.java @@ -23,6 +23,7 @@ import org.apache.iceberg.DataFile; import org.apache.iceberg.RewriteFiles; import org.apache.iceberg.Table; +import org.apache.iceberg.exceptions.CleanableFailure; import org.apache.iceberg.exceptions.CommitStateUnknownException; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; @@ -119,8 +120,12 @@ public void commitOrClean(Set rewriteGroups) { e); throw e; } catch (Exception e) { - LOG.error("Cannot commit groups {}, attempting to clean up written files", rewriteGroups, e); - rewriteGroups.forEach(this::abortFileGroup); + if (e instanceof CleanableFailure) { + LOG.error( + "Cannot commit groups {}, attempting to clean up written files", rewriteGroups, e); + rewriteGroups.forEach(this::abortFileGroup); + } + throw e; } } diff --git a/core/src/main/java/org/apache/iceberg/actions/RewritePositionDeletesCommitManager.java b/core/src/main/java/org/apache/iceberg/actions/RewritePositionDeletesCommitManager.java index 01b2f7528ee3..b1322d5e58b4 100644 --- a/core/src/main/java/org/apache/iceberg/actions/RewritePositionDeletesCommitManager.java +++ b/core/src/main/java/org/apache/iceberg/actions/RewritePositionDeletesCommitManager.java @@ -24,6 +24,7 @@ import org.apache.iceberg.DeleteFile; import org.apache.iceberg.RewriteFiles; import org.apache.iceberg.Table; +import org.apache.iceberg.exceptions.CleanableFailure; import org.apache.iceberg.exceptions.CommitStateUnknownException; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; @@ -102,8 +103,12 @@ public void commitOrClean(Set rewriteGroups) { e); throw e; } catch (Exception e) { - LOG.error("Cannot commit groups {}, attempting to clean up written files", rewriteGroups, e); - rewriteGroups.forEach(this::abort); + if (e instanceof CleanableFailure) { + LOG.error( + "Cannot commit groups {}, attempting to clean up written files", rewriteGroups, e); + rewriteGroups.forEach(this::abort); + } + throw e; } } diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java index 8ec3b44f9284..d3569f45b199 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java @@ -46,6 +46,7 @@ import org.apache.iceberg.TableProperties; import org.apache.iceberg.actions.ImmutableRewriteManifests; import org.apache.iceberg.actions.RewriteManifests; +import org.apache.iceberg.exceptions.CleanableFailure; import org.apache.iceberg.exceptions.CommitStateUnknownException; import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.io.OutputFile; @@ -354,10 +355,13 @@ private void replaceManifests( } catch (CommitStateUnknownException commitStateUnknownException) { // don't clean up added manifest files, because they may have been successfully committed. throw commitStateUnknownException; - } catch (Exception e) { - // delete all new manifests because the rewrite failed - deleteFiles(Iterables.transform(addedManifests, ManifestFile::path)); - throw e; + } catch (Throwable failure) { + if (failure instanceof CleanableFailure) { + // delete all new manifests because the rewrite failed + deleteFiles(Iterables.transform(addedManifests, ManifestFile::path)); + } + + throw failure; } } diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java index a964f7686394..026629aa5c17 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java @@ -44,7 +44,7 @@ import org.apache.iceberg.Table; import org.apache.iceberg.deletes.DeleteGranularity; import org.apache.iceberg.deletes.PositionDelete; -import org.apache.iceberg.exceptions.CommitStateUnknownException; +import org.apache.iceberg.exceptions.CleanableFailure; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.io.BasePositionDeltaWriter; @@ -105,7 +105,7 @@ class SparkPositionDeltaWrite implements DeltaWrite, RequiresDistributionAndOrde private final Context context; private final Map writeProperties; - private boolean cleanupOnAbort = true; + private boolean cleanupOnAbort = false; SparkPositionDeltaWrite( SparkSession spark, @@ -304,9 +304,12 @@ private void commitOperation(SnapshotUpdate operation, String description) { operation.commit(); // abort is automatically called if this fails long duration = System.currentTimeMillis() - start; LOG.info("Committed in {} ms", duration); - } catch (CommitStateUnknownException commitStateUnknownException) { - cleanupOnAbort = false; - throw commitStateUnknownException; + } catch (Throwable failure) { + if (failure instanceof CleanableFailure) { + cleanupOnAbort = true; + } + + throw failure; } } } diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java index d23c473bb46b..ff4772fb48aa 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java @@ -40,7 +40,7 @@ import org.apache.iceberg.SnapshotSummary; import org.apache.iceberg.SnapshotUpdate; import org.apache.iceberg.Table; -import org.apache.iceberg.exceptions.CommitStateUnknownException; +import org.apache.iceberg.exceptions.CleanableFailure; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.io.ClusteredDataWriter; @@ -102,7 +102,7 @@ abstract class SparkWrite implements Write, RequiresDistributionAndOrdering { private final SparkWriteRequirements writeRequirements; private final Map writeProperties; - private boolean cleanupOnAbort = true; + private boolean cleanupOnAbort = false; SparkWrite( SparkSession spark, @@ -233,9 +233,12 @@ private void commitOperation(SnapshotUpdate operation, String description) { operation.commit(); // abort is automatically called if this fails long duration = System.currentTimeMillis() - start; LOG.info("Committed in {} ms", duration); - } catch (CommitStateUnknownException commitStateUnknownException) { - cleanupOnAbort = false; - throw commitStateUnknownException; + } catch (Throwable failure) { + if (failure instanceof CleanableFailure) { + cleanupOnAbort = true; + } + + throw failure; } }