Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bulk delete #6682

Merged
merged 12 commits into from
Mar 2, 2023
Merged

Bulk delete #6682

merged 12 commits into from
Mar 2, 2023

Conversation

RussellSpitzer
Copy link
Member

Changes all Deleting Spark Actions to use FileIO Bulk Operations, adds Bulk delete to HadoopIO

The basic idea here is all of our deletes should use the bulk api or have their parallelism controlled at the FileIO level primarily. All deletes should use some parallelism by default.

…ailable

Previously deletes were handled by a per Action execution service that would be used
to parallelize single deletes. In this PR we move the responsibility of performing the
deletes and the parallelization of those deletes to the FileIO via SupportsBulkOperations.

This deprecates all methods which used to be used for doing single deletes as well as
passing executor services to Actions which delete many files.
@amogh-jahagirdar
Copy link
Contributor

Thanks a ton for closing the loop on this @RussellSpitzer ! Left some comments

@aokolnychyi
Copy link
Contributor

I am getting to this today, hopefully.

Copy link
Contributor

@dramaticlly dramaticlly left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would love to see this merged, thank you @RussellSpitzer

@aokolnychyi
Copy link
Contributor

I agree with the overall direction but I'd try to support the existing API to avoid massive deprecation and simplify the implementation. It will be hard to test all possible scenarios.

.onFailure(
(f, e) -> {
LOG.error("Failure during bulk delete on file: {} ", f, e);
failureCount.incrementAndGet();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is going to increment the count on each failed attempt and won't be accurate. We could count the number of successfully deleted files instead and then use Iterables.size(pathsToDelete) to find how many we were supposed to delete.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I thought it was once per element

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure we want to go over the iterable more than once ... let me think about this

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I double checked this, it only fires off when all retries are exhausted so it is correct as is.

scala> def testFailure() = { 
  var failureCount =0 
  Tasks.foreach("value")
            .retry(3)
            .onFailure((y, x: Throwable) => failureCount += 1)
            .suppressFailureWhenFinished()
            .run(x => throw new Exception("ohNO"))
   failureCount
 }
 
 scala> testFailure()
23/03/01 10:16:22 WARN Tasks: Retrying task after failure: ohNO
java.lang.Exception: ohNO
...	
23/03/01 10:16:23 WARN Tasks: Retrying task after failure: ohNO
java.lang.Exception: ohNO
...
23/03/01 10:16:25 WARN Tasks: Retrying task after failure: ohNO
java.lang.Exception: ohNO
...
res21: Int = 1

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

runTaskWithRetry(task, item);
succeeded.add(item);
} catch (Exception e) {
exceptions.add(e);
if (onFailure != null) {
tryRunOnFailure(item, e);
Code in question (RunWithRetry) does all retries before hitting "onFailure"

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right, we overlooked it while reviewing another PR. I like it more. I'll update SparkCleanupUtil to follow this patter as well.

.suppressFailureWhenFinished()
.onFailure((file, exc) -> LOG.warn("Failed to delete file: {}", file, exc))
.run(deleteFunc::accept);
if (deleteFunc == null && table.io() instanceof SupportsBulkOperations) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the new pattern

if (bulk)
 Bulk
 else {
    if no custom delete
        table.io:: delete
    If custom delete
          custom Delete
 }

This logic is repeated in all of the actions

if (deleteFunc == null && io instanceof SupportsBulkOperations) {
summary = deleteFiles((SupportsBulkOperations) io, files);
} else {

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually meant an empty line after DeleteSummary var but formatting here is up to you.
I like the new pattern.


if (deleteFunc == null) {
LOG.info(
"Table IO {} does not support bulk operations. Using non-bulk deletes.", table.io());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here.

Copy link
Contributor

@aokolnychyi aokolnychyi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few non-blocking nits. Looks great otherwise. Thanks, @RussellSpitzer! Feel free to merge whenever you are ready.

@RussellSpitzer
Copy link
Member Author

Thanks @amogh-jahagirdar , @dramaticlly and @aokolnychyi I'll merge when tests pass. I'll do the Backport Pr's after my subsurface talk.

@RussellSpitzer RussellSpitzer merged commit 5e40182 into apache:master Mar 2, 2023
@RussellSpitzer RussellSpitzer deleted the BulkDelete branch March 2, 2023 17:27
RussellSpitzer added a commit to RussellSpitzer/iceberg that referenced this pull request Mar 8, 2023
aokolnychyi pushed a commit that referenced this pull request Mar 10, 2023
This change backports PR #6682 to Spark 3.2.
krvikash pushed a commit to krvikash/iceberg that referenced this pull request Mar 16, 2023
Previously deletes were handled by a per Action execution service that would be used
to parallelize single deletes. In this PR we move the responsibility of performing the
deletes and the parallelization of those deletes to the FileIO via SupportsBulkOperations.
krvikash pushed a commit to krvikash/iceberg that referenced this pull request Mar 16, 2023
sunchao pushed a commit to sunchao/iceberg that referenced this pull request May 10, 2023
Previously deletes were handled by a per Action execution service that would be used
to parallelize single deletes. In this PR we move the responsibility of performing the
deletes and the parallelization of those deletes to the FileIO via SupportsBulkOperations.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants