-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
AWS: Use executor service by default when performing batch deletion of files #5379
AWS: Use executor service by default when performing batch deletion of files #5379
Conversation
I'm running AWS integ tests to validate this. |
0e0eaed
to
8ec83dd
Compare
d44f31e
to
7183866
Compare
api/src/main/java/org/apache/iceberg/io/SupportsBulkOperations.java
Outdated
Show resolved
Hide resolved
7183866
to
389747b
Compare
389747b
to
0f566fe
Compare
if (!awsProperties.isS3DeleteEnabled()) { | ||
return; | ||
if (awsProperties.isS3DeleteEnabled()) { | ||
SetMultimap<String, String> bucketToObjects = computeBucketToObjects(paths); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are now eagerly computing the bucket to objects mapping up front, as opposed to before where we would iterate over the paths, keep track of the objects per bucket and if for a given bucket the size of the objects is the batch size, the deletion would get triggered (and the mapping would get removed).
Now, it's all up front, so there would be more memory consumption but I think it should be ok. 1 million objects with a max key size of 1024 bytes is 1 GB representation of paths maintained in memory.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
1gb does sounds like a lot , example on the spark driver. Is it still possible to do it via streaming, ie submit the deletion batch to Tasks once it gets full?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we try to use https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/services/s3/paginators/ListObjectsV2Iterable.html so that the list is dynamically loaded instead of buffered upfront
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jackye1995 I don't think we need a separate API for listing actually, we are already given the iterable of paths. This iterable can be the ListObjectsV2Iterable (which is done as of today in the deletePrefix).
For lazily loading in memory, and still deleting in a concurrent manner we can do the following:
1.) Still use the previous approach and constructing the bucket/key from the path and keeping track of when the objects for a bucket hits a certain batch size.
2.) Instead of using task framework, submit the deletion to an executor service and just keep track of the future. we don't want to use Tasks because it will wait for the completion internally.
We want to just submit the batch deletion and move on, and then at the very end check the statuses of all those tasks. So using the underlying executor service fits that pattern. Let me know what you think.
Will update the PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good!
0f566fe
to
a4e36c0
Compare
aws/src/integration/java/org/apache/iceberg/aws/s3/TestS3FileIOIntegration.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is probably a valid change, but as of now it doesnt seem to match the pr description ? The pr description is about using an executor service. Can you clarify the pr description?
aws/src/integration/java/org/apache/iceberg/aws/s3/TestS3FileIOIntegration.java
Outdated
Show resolved
Hide resolved
5be1059
to
639acf1
Compare
Updated the description so it reflects the new approach. I also removed the integ test fixes, and moved them here #5413 . Thanks! |
c7059a1
to
e399c4e
Compare
316d6d9
to
2856de4
Compare
aws/src/integration/java/org/apache/iceberg/aws/s3/TestS3FileIOIntegration.java
Show resolved
Hide resolved
0922b40
to
eeeda0c
Compare
95ad675
to
63374cb
Compare
Let me take another look tomorrow. Sorry for the delay! |
} | ||
} catch (Exception e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[doubt] Any reason we are catching a generic exception here ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I think in case of any failure we should surface a BulkDeletionFailure at the end. So catching the generic exception allows us to handle any failure, treat it as a failure to delete the entire batch, and add that to the failed files list. I'm not sure of any other case where we want to surface something else. We're logging the specific exception so that folks can debug.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change looks good to me.
After a closer look, passing an explicit executor started to make more sense to me. I think we may add that overloaded method back once we consume these changes in other places.
63374cb
to
a29ba42
Compare
Right, this is my thought as well. Once we see there is a need for the API we can add that, it's harder to go the other way. Thanks for the review @aokolnychyi ! |
I think we get enough approvals, thanks for the work @amogh-jahagirdar , and thanks everyone for the review! |
Thanks everyone for the reviews! @jackye1995 @singhpk234 @aokolnychyi @szehon-ho |
(cherry picked from commit f6d9ddc)
Update the S3FileIO to lazily load batches and use the existing deletion threadpool for performing concurrent S3#RemoveObjects calls.
This will be used in subsequent PRs for performing bulk deletes in procedures like removing orphan files, snapshot expiration and purging the data, manifests, old metadata files during table drop.