Skip to content

Commit

Permalink
Core, Spark: Spark writes/actions should only perform cleanup if fail…
Browse files Browse the repository at this point in the history
…ure is cleanable
  • Loading branch information
amogh-jahagirdar committed Jun 20, 2024
1 parent 08cc776 commit 030df19
Show file tree
Hide file tree
Showing 9 changed files with 193 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.encryption.EncryptionManager;
import org.apache.iceberg.exceptions.CleanableFailure;
import org.apache.iceberg.exceptions.CommitStateUnknownException;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
Expand Down Expand Up @@ -308,12 +309,15 @@ private void replaceDataFiles(
LOG.warn("Commit state unknown, cannot clean up files that may have been committed", e);
throw e;
} catch (Exception e) {
LOG.warn("Failed to commit rewrite, cleaning up rewritten files", e);
Tasks.foreach(Iterables.transform(addedDataFiles, f -> f.path().toString()))
.noRetry()
.suppressFailureWhenFinished()
.onFailure((location, exc) -> LOG.warn("Failed to delete: {}", location, exc))
.run(fileIO::deleteFile);
if (e instanceof CleanableFailure) {
LOG.warn("Failed to commit rewrite, cleaning up rewritten files", e);
Tasks.foreach(Iterables.transform(addedDataFiles, f -> f.path().toString()))
.noRetry()
.suppressFailureWhenFinished()
.onFailure((location, exc) -> LOG.warn("Failed to delete: {}", location, exc))
.run(fileIO::deleteFile);
}

throw e;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.iceberg.DataFile;
import org.apache.iceberg.RewriteFiles;
import org.apache.iceberg.Table;
import org.apache.iceberg.exceptions.CleanableFailure;
import org.apache.iceberg.exceptions.CommitStateUnknownException;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
Expand Down Expand Up @@ -119,8 +120,12 @@ public void commitOrClean(Set<RewriteFileGroup> rewriteGroups) {
e);
throw e;
} catch (Exception e) {
LOG.error("Cannot commit groups {}, attempting to clean up written files", rewriteGroups, e);
rewriteGroups.forEach(this::abortFileGroup);
if (e instanceof CleanableFailure) {
LOG.error(
"Cannot commit groups {}, attempting to clean up written files", rewriteGroups, e);
rewriteGroups.forEach(this::abortFileGroup);
}

throw e;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.RewriteFiles;
import org.apache.iceberg.Table;
import org.apache.iceberg.exceptions.CleanableFailure;
import org.apache.iceberg.exceptions.CommitStateUnknownException;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
Expand Down Expand Up @@ -102,8 +103,12 @@ public void commitOrClean(Set<RewritePositionDeletesGroup> rewriteGroups) {
e);
throw e;
} catch (Exception e) {
LOG.error("Cannot commit groups {}, attempting to clean up written files", rewriteGroups, e);
rewriteGroups.forEach(this::abort);
if (e instanceof CleanableFailure) {
LOG.error(
"Cannot commit groups {}, attempting to clean up written files", rewriteGroups, e);
rewriteGroups.forEach(this::abort);
}

throw e;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
import org.apache.iceberg.encryption.EncryptedFiles;
import org.apache.iceberg.encryption.EncryptedOutputFile;
import org.apache.iceberg.encryption.EncryptionKeyMetadata;
import org.apache.iceberg.exceptions.CommitFailedException;
import org.apache.iceberg.exceptions.CommitStateUnknownException;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
Expand Down Expand Up @@ -673,14 +674,14 @@ public void testSingleCommitWithCommitFailure() {
RewriteDataFilesCommitManager util = spy(new RewriteDataFilesCommitManager(table));

// Fail to commit
doThrow(new RuntimeException("Commit Failure")).when(util).commitFileGroups(any());
doThrow(new CommitFailedException("Commit Failure")).when(util).commitFileGroups(any());

doReturn(util).when(spyRewrite).commitManager(table.currentSnapshot().snapshotId());

assertThatThrownBy(() -> spyRewrite.execute())
.as("Should fail entire rewrite if commit fails")
.isInstanceOf(RuntimeException.class)
.hasMessage("Commit Failure");
.hasMessageContaining("Cannot commit rewrite");

table.refresh();

Expand All @@ -692,6 +693,40 @@ public void testSingleCommitWithCommitFailure() {
shouldHaveACleanCache(table);
}

@Test
public void testCommitFailsWithUncleanableFailure() {
Table table = createTable(20);
int fileSize = averageFileSize(table);

List<Object[]> originalData = currentData();

RewriteDataFilesSparkAction realRewrite =
basicRewrite(table)
.option(
RewriteDataFiles.MAX_FILE_GROUP_SIZE_BYTES, Integer.toString(fileSize * 2 + 1000));

RewriteDataFilesSparkAction spyRewrite = spy(realRewrite);
RewriteDataFilesCommitManager util = spy(new RewriteDataFilesCommitManager(table));

// Fail to commit with an arbitrary failure and validate that orphans are not cleaned up
doThrow(new RuntimeException("Arbitrary Failure")).when(util).commitFileGroups(any());

doReturn(util).when(spyRewrite).commitManager(table.currentSnapshot().snapshotId());

assertThatThrownBy(spyRewrite::execute)
.isInstanceOf(RuntimeException.class)
.hasMessageContaining("Arbitrary Failure");

table.refresh();

List<Object[]> postRewriteData = currentData();
assertEquals("We shouldn't have changed the data", originalData, postRewriteData);

shouldHaveSnapshots(table, 1);
shouldHaveOrphans(table);
shouldHaveACleanCache(table);
}

@Test
public void testParallelSingleCommitWithRewriteFailure() {
Table table = createTable(20);
Expand All @@ -709,13 +744,13 @@ public void testParallelSingleCommitWithRewriteFailure() {

// Fail groups 1, 3, and 7 during rewrite
GroupInfoMatcher failGroup = new GroupInfoMatcher(1, 3, 7);
doThrow(new RuntimeException("Rewrite Failed"))
doThrow(new CommitFailedException("Rewrite Failed"))
.when(spyRewrite)
.rewriteFiles(any(), argThat(failGroup));

assertThatThrownBy(() -> spyRewrite.execute())
.as("Should fail entire rewrite if part fails")
.isInstanceOf(RuntimeException.class)
.isInstanceOf(CommitFailedException.class)
.hasMessage("Rewrite Failed");

table.refresh();
Expand Down Expand Up @@ -830,7 +865,7 @@ public void testParallelPartialProgressWithCommitFailure() {

// First and Third commits work, second does not
doCallRealMethod()
.doThrow(new RuntimeException("Commit Failed"))
.doThrow(new CommitFailedException("Commit Failed"))
.doCallRealMethod()
.when(util)
.commitFileGroups(any());
Expand Down Expand Up @@ -1562,6 +1597,17 @@ protected void shouldHaveNoOrphans(Table table) {
.orphanFileLocations());
}

protected void shouldHaveOrphans(Table table) {
assertThat(
actions()
.deleteOrphanFiles(table)
.olderThan(System.currentTimeMillis())
.execute()
.orphanFileLocations())
.as("Should have found orphan files")
.isNotEmpty();
}

protected void shouldHaveACleanCache(Table table) {
Assert.assertEquals(
"Should not have any entries in cache", ImmutableSet.of(), cacheContents(table));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
import org.apache.iceberg.encryption.EncryptedFiles;
import org.apache.iceberg.encryption.EncryptedOutputFile;
import org.apache.iceberg.encryption.EncryptionKeyMetadata;
import org.apache.iceberg.exceptions.CommitFailedException;
import org.apache.iceberg.exceptions.CommitStateUnknownException;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
Expand Down Expand Up @@ -707,13 +708,13 @@ public void testSingleCommitWithCommitFailure() {
RewriteDataFilesCommitManager util = spy(new RewriteDataFilesCommitManager(table));

// Fail to commit
doThrow(new RuntimeException("Commit Failure")).when(util).commitFileGroups(any());
doThrow(new CommitFailedException("Commit Failure")).when(util).commitFileGroups(any());

doReturn(util).when(spyRewrite).commitManager(table.currentSnapshot().snapshotId());

assertThatThrownBy(spyRewrite::execute)
.isInstanceOf(RuntimeException.class)
.hasMessage("Commit Failure");
.hasMessageContaining("Cannot commit rewrite");

table.refresh();

Expand All @@ -725,6 +726,40 @@ public void testSingleCommitWithCommitFailure() {
shouldHaveACleanCache(table);
}

@Test
public void testCommitFailsWithUncleanableFailure() {
Table table = createTable(20);
int fileSize = averageFileSize(table);

List<Object[]> originalData = currentData();

RewriteDataFilesSparkAction realRewrite =
basicRewrite(table)
.option(
RewriteDataFiles.MAX_FILE_GROUP_SIZE_BYTES, Integer.toString(fileSize * 2 + 1000));

RewriteDataFilesSparkAction spyRewrite = spy(realRewrite);
RewriteDataFilesCommitManager util = spy(new RewriteDataFilesCommitManager(table));

// Fail to commit with an arbitrary failure and validate that orphans are not cleaned up
doThrow(new RuntimeException("Arbitrary Failure")).when(util).commitFileGroups(any());

doReturn(util).when(spyRewrite).commitManager(table.currentSnapshot().snapshotId());

assertThatThrownBy(spyRewrite::execute)
.isInstanceOf(RuntimeException.class)
.hasMessageContaining("Arbitrary Failure");

table.refresh();

List<Object[]> postRewriteData = currentData();
assertEquals("We shouldn't have changed the data", originalData, postRewriteData);

shouldHaveSnapshots(table, 1);
shouldHaveOrphans(table);
shouldHaveACleanCache(table);
}

@Test
public void testParallelSingleCommitWithRewriteFailure() {
Table table = createTable(20);
Expand All @@ -742,12 +777,12 @@ public void testParallelSingleCommitWithRewriteFailure() {

// Fail groups 1, 3, and 7 during rewrite
GroupInfoMatcher failGroup = new GroupInfoMatcher(1, 3, 7);
doThrow(new RuntimeException("Rewrite Failed"))
doThrow(new CommitFailedException("Rewrite Failed"))
.when(spyRewrite)
.rewriteFiles(any(), argThat(failGroup));

assertThatThrownBy(spyRewrite::execute)
.isInstanceOf(RuntimeException.class)
.isInstanceOf(CommitFailedException.class)
.hasMessage("Rewrite Failed");

table.refresh();
Expand Down Expand Up @@ -866,7 +901,7 @@ public void testParallelPartialProgressWithCommitFailure() {

// First and Third commits work, second does not
doCallRealMethod()
.doThrow(new RuntimeException("Commit Failed"))
.doThrow(new CommitFailedException("Commit Failed"))
.doCallRealMethod()
.when(util)
.commitFileGroups(any());
Expand Down Expand Up @@ -1599,6 +1634,17 @@ protected void shouldHaveNoOrphans(Table table) {
.orphanFileLocations());
}

protected void shouldHaveOrphans(Table table) {
assertThat(
actions()
.deleteOrphanFiles(table)
.olderThan(System.currentTimeMillis())
.execute()
.orphanFileLocations())
.as("Should have found orphan files")
.isNotEmpty();
}

protected void shouldHaveACleanCache(Table table) {
Assert.assertEquals(
"Should not have any entries in cache", ImmutableSet.of(), cacheContents(table));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.actions.ImmutableRewriteManifests;
import org.apache.iceberg.actions.RewriteManifests;
import org.apache.iceberg.exceptions.CleanableFailure;
import org.apache.iceberg.exceptions.CommitStateUnknownException;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.io.OutputFile;
Expand Down Expand Up @@ -355,8 +356,11 @@ private void replaceManifests(
// don't clean up added manifest files, because they may have been successfully committed.
throw commitStateUnknownException;
} catch (Exception e) {
// delete all new manifests because the rewrite failed
deleteFiles(Iterables.transform(addedManifests, ManifestFile::path));
if (e instanceof CleanableFailure) {
// delete all new manifests because the rewrite failed
deleteFiles(Iterables.transform(addedManifests, ManifestFile::path));
}

throw e;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
import org.apache.iceberg.Table;
import org.apache.iceberg.deletes.DeleteGranularity;
import org.apache.iceberg.deletes.PositionDelete;
import org.apache.iceberg.exceptions.CommitStateUnknownException;
import org.apache.iceberg.exceptions.CleanableFailure;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.io.BasePositionDeltaWriter;
Expand Down Expand Up @@ -105,7 +105,7 @@ class SparkPositionDeltaWrite implements DeltaWrite, RequiresDistributionAndOrde
private final Context context;
private final Map<String, String> writeProperties;

private boolean cleanupOnAbort = true;
private boolean cleanupOnAbort = false;

SparkPositionDeltaWrite(
SparkSession spark,
Expand Down Expand Up @@ -304,9 +304,9 @@ private void commitOperation(SnapshotUpdate<?> operation, String description) {
operation.commit(); // abort is automatically called if this fails
long duration = System.currentTimeMillis() - start;
LOG.info("Committed in {} ms", duration);
} catch (CommitStateUnknownException commitStateUnknownException) {
cleanupOnAbort = false;
throw commitStateUnknownException;
} catch (Exception e) {
cleanupOnAbort = e instanceof CleanableFailure;
throw e;
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
import org.apache.iceberg.SnapshotSummary;
import org.apache.iceberg.SnapshotUpdate;
import org.apache.iceberg.Table;
import org.apache.iceberg.exceptions.CommitStateUnknownException;
import org.apache.iceberg.exceptions.CleanableFailure;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.io.ClusteredDataWriter;
Expand Down Expand Up @@ -102,7 +102,7 @@ abstract class SparkWrite implements Write, RequiresDistributionAndOrdering {
private final SparkWriteRequirements writeRequirements;
private final Map<String, String> writeProperties;

private boolean cleanupOnAbort = true;
private boolean cleanupOnAbort = false;

SparkWrite(
SparkSession spark,
Expand Down Expand Up @@ -233,9 +233,9 @@ private void commitOperation(SnapshotUpdate<?> operation, String description) {
operation.commit(); // abort is automatically called if this fails
long duration = System.currentTimeMillis() - start;
LOG.info("Committed in {} ms", duration);
} catch (CommitStateUnknownException commitStateUnknownException) {
cleanupOnAbort = false;
throw commitStateUnknownException;
} catch (Exception e) {
cleanupOnAbort = e instanceof CleanableFailure;
throw e;
}
}

Expand Down
Loading

0 comments on commit 030df19

Please sign in to comment.