-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Bulk delete #6682
Bulk delete #6682
Conversation
…ailable Previously deletes were handled by a per Action execution service that would be used to parallelize single deletes. In this PR we move the responsibility of performing the deletes and the parallelization of those deletes to the FileIO via SupportsBulkOperations. This deprecates all methods which used to be used for doing single deletes as well as passing executor services to Actions which delete many files.
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java
Show resolved
Hide resolved
...v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/RemoveOrphanFilesProcedure.java
Outdated
Show resolved
Hide resolved
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java
Outdated
Show resolved
Hide resolved
Thanks a ton for closing the loop on this @RussellSpitzer ! Left some comments |
I am getting to this today, hopefully. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would love to see this merged, thank you @RussellSpitzer
.../v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java
Show resolved
Hide resolved
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java
Outdated
Show resolved
Hide resolved
api/src/main/java/org/apache/iceberg/actions/DeleteOrphanFiles.java
Outdated
Show resolved
Hide resolved
api/src/main/java/org/apache/iceberg/actions/DeleteOrphanFiles.java
Outdated
Show resolved
Hide resolved
api/src/main/java/org/apache/iceberg/actions/DeleteOrphanFiles.java
Outdated
Show resolved
Hide resolved
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java
Show resolved
Hide resolved
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java
Outdated
Show resolved
Hide resolved
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java
Outdated
Show resolved
Hide resolved
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java
Outdated
Show resolved
Hide resolved
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java
Show resolved
Hide resolved
I agree with the overall direction but I'd try to support the existing API to avoid massive deprecation and simplify the implementation. It will be hard to test all possible scenarios. |
api/src/main/java/org/apache/iceberg/actions/DeleteOrphanFiles.java
Outdated
Show resolved
Hide resolved
api/src/main/java/org/apache/iceberg/actions/ExpireSnapshots.java
Outdated
Show resolved
Hide resolved
api/src/main/java/org/apache/iceberg/actions/DeleteReachableFiles.java
Outdated
Show resolved
Hide resolved
api/src/main/java/org/apache/iceberg/actions/ExpireSnapshots.java
Outdated
Show resolved
Hide resolved
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java
Show resolved
Hide resolved
.../v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java
Outdated
Show resolved
Hide resolved
125af50
to
46107fd
Compare
46107fd
to
cb28cba
Compare
api/src/main/java/org/apache/iceberg/actions/ExpireSnapshots.java
Outdated
Show resolved
Hide resolved
.onFailure( | ||
(f, e) -> { | ||
LOG.error("Failure during bulk delete on file: {} ", f, e); | ||
failureCount.incrementAndGet(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is going to increment the count on each failed attempt and won't be accurate. We could count the number of successfully deleted files instead and then use Iterables.size(pathsToDelete)
to find how many we were supposed to delete.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah I thought it was once per element
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure we want to go over the iterable more than once ... let me think about this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I double checked this, it only fires off when all retries are exhausted so it is correct as is.
scala> def testFailure() = {
var failureCount =0
Tasks.foreach("value")
.retry(3)
.onFailure((y, x: Throwable) => failureCount += 1)
.suppressFailureWhenFinished()
.run(x => throw new Exception("ohNO"))
failureCount
}
scala> testFailure()
23/03/01 10:16:22 WARN Tasks: Retrying task after failure: ohNO
java.lang.Exception: ohNO
...
23/03/01 10:16:23 WARN Tasks: Retrying task after failure: ohNO
java.lang.Exception: ohNO
...
23/03/01 10:16:25 WARN Tasks: Retrying task after failure: ohNO
java.lang.Exception: ohNO
...
res21: Int = 1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
iceberg/core/src/main/java/org/apache/iceberg/util/Tasks.java
Lines 219 to 225 in 715c9b9
runTaskWithRetry(task, item); | |
succeeded.add(item); | |
} catch (Exception e) { | |
exceptions.add(e); | |
if (onFailure != null) { | |
tryRunOnFailure(item, e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right, we overlooked it while reviewing another PR. I like it more. I'll update SparkCleanupUtil
to follow this patter as well.
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java
Outdated
Show resolved
Hide resolved
...v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/RemoveOrphanFilesProcedure.java
Outdated
Show resolved
Hide resolved
.../v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java
Outdated
Show resolved
Hide resolved
.../v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java
Outdated
Show resolved
Hide resolved
.../v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java
Outdated
Show resolved
Hide resolved
.../v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java
Show resolved
Hide resolved
.suppressFailureWhenFinished() | ||
.onFailure((file, exc) -> LOG.warn("Failed to delete file: {}", file, exc)) | ||
.run(deleteFunc::accept); | ||
if (deleteFunc == null && table.io() instanceof SupportsBulkOperations) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the new pattern
if (bulk)
Bulk
else {
if no custom delete
table.io:: delete
If custom delete
custom Delete
}
This logic is repeated in all of the actions
.../v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java
Show resolved
Hide resolved
if (deleteFunc == null && io instanceof SupportsBulkOperations) { | ||
summary = deleteFiles((SupportsBulkOperations) io, files); | ||
} else { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I actually meant an empty line after DeleteSummary
var but formatting here is up to you.
I like the new pattern.
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java
Show resolved
Hide resolved
|
||
if (deleteFunc == null) { | ||
LOG.info( | ||
"Table IO {} does not support bulk operations. Using non-bulk deletes.", table.io()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here.
...k/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/ExpireSnapshotsProcedure.java
Outdated
Show resolved
Hide resolved
...v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/RemoveOrphanFilesProcedure.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A few non-blocking nits. Looks great otherwise. Thanks, @RussellSpitzer! Feel free to merge whenever you are ready.
Thanks @amogh-jahagirdar , @dramaticlly and @aokolnychyi I'll merge when tests pass. I'll do the Backport Pr's after my subsurface talk. |
This change backports PR #6682 to Spark 3.2.
Previously deletes were handled by a per Action execution service that would be used to parallelize single deletes. In this PR we move the responsibility of performing the deletes and the parallelization of those deletes to the FileIO via SupportsBulkOperations.
This change backports PR apache#6682 to Spark 3.2.
Previously deletes were handled by a per Action execution service that would be used to parallelize single deletes. In this PR we move the responsibility of performing the deletes and the parallelization of those deletes to the FileIO via SupportsBulkOperations.
Changes all Deleting Spark Actions to use FileIO Bulk Operations, adds Bulk delete to HadoopIO
The basic idea here is all of our deletes should use the bulk api or have their parallelism controlled at the FileIO level primarily. All deletes should use some parallelism by default.