-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
API, Spark: Update remove orphan files procedure to use bulk deletion if applicable #5373
API, Spark: Update remove orphan files procedure to use bulk deletion if applicable #5373
Conversation
702b92f
to
31cf564
Compare
efb7fe1
to
eadd945
Compare
d94843c
to
a0afe6e
Compare
Still figuring out a good way to write tests for this, but in the interim @RussellSpitzer @aokolnychyi @karuppayya @singhpk234 @jackye1995 would like to get your feedback! |
a0afe6e
to
c315669
Compare
if (batchDeletionSize > 1) { | ||
Preconditions.checkArgument( | ||
table.io() instanceof SupportsBulkOperations, | ||
"FileIO %s does not support bulk deletion", | ||
table.io().getClass().getName()); | ||
SupportsBulkOperations bulkFileIO = (SupportsBulkOperations) table.io(); | ||
List<List<String>> fileBatches = Lists.partition(orphanFiles, batchDeletionSize); | ||
Tasks.foreach(fileBatches) | ||
.noRetry() | ||
.executeWith(deleteExecutorService) | ||
.suppressFailureWhenFinished() | ||
.run(bulkFileIO::deleteFiles); | ||
} else { | ||
Tasks.foreach(orphanFiles) | ||
.noRetry() | ||
.executeWith(deleteExecutorService) | ||
.suppressFailureWhenFinished() | ||
.onFailure((file, exc) -> LOG.warn("Failed to delete file: {}", file, exc)) | ||
.run(deleteFunc::accept); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can probably be abstracted away into a single method which delegates to the right approach but I didn't want to introduce more indirection or expose public methods unnecessarily until we know for sure we want them.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, having a helper / wrapper to wrap this if ..else should be a good one to have, and can help in adding this change more places !
[Not in scope of pr] Any thoughts on ResolvingFileIO which is a wrapper on other fileIO's.
"FileIO %s does not support bulk deletion", | ||
table.io().getClass().getName()); | ||
SupportsBulkOperations bulkFileIO = (SupportsBulkOperations) table.io(); | ||
List<List<String>> fileBatches = Lists.partition(orphanFiles, batchDeletionSize); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm thinking we leave the batching logic to the fileIO. Since the pattern being followed for more advanced file io interaction is to have mixin interfaces, this lets us delegate more to fileIO with "special" capabilities. So I think it makes sense to remove batching from the action itself and just let the fileIO take care of it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if I understand correctly, this batchDeletionSize determine how long of a list can be passed to fileIO batch deletion, so in order for fileIO to handle I guess this would be set to larger than expected one?
Say if we set batchDeletionSize = 50
in remove-orphan call and S3FILEIO_DELETE_BATCH_SIZE_DEFAULT = 250
, the S3 can only bucket 50 orphan files in a single call
.noRetry() | ||
.executeWith(deleteExecutorService) | ||
.suppressFailureWhenFinished() | ||
.run(bulkFileIO::deleteFiles); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Was looking a bit with @dramaticlly at this earlier. My concern here is the existing 'deleteFunc' plugin mechanism, which I know some users are using for dry-run purpose, ie printing out the list of files to delete instead of actually deleting them. Im thinking:
- do a precondition check, if the deleteFunc != defaultDeleteFunc. If so, then dont allow bulkDelete
- add a bulk deleteFunc?
@aokolnychyi any thoughts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey Szeon, I'm going to update this PR based on my change in https://github.com/apache/iceberg/pull/5379/files.
My thinking is we should always just use deleteFunc if it's passed in (should be source of truth for deletion if it's set). If it's not and in the case that the file io supports bulk operation we can just do fileIO.deleteFiles(), otherwise we go back to the existing mechanism.
That way we preserve existing behavior for the procedure and have the additional optimization for file io types which support bulk delete. Also the underlying fileIO can take care of how the batches are created (so it's optimal for the underlying storage, in the sense of maximizing throughput of the deletion, minimizing throttling, etc). The underlying fileIO can also handle parallelism as its desire. So in the procedure level we don't need to pass in a batch size.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, I was thinking about adding one for expire snapshots as well. But looking at this change, I noticed there's no test coverage included, can we add any unit tests for this ?
"FileIO %s does not support bulk deletion", | ||
table.io().getClass().getName()); | ||
SupportsBulkOperations bulkFileIO = (SupportsBulkOperations) table.io(); | ||
List<List<String>> fileBatches = Lists.partition(orphanFiles, batchDeletionSize); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if I understand correctly, this batchDeletionSize determine how long of a list can be passed to fileIO batch deletion, so in order for fileIO to handle I guess this would be set to larger than expected one?
Say if we set batchDeletionSize = 50
in remove-orphan call and S3FILEIO_DELETE_BATCH_SIZE_DEFAULT = 250
, the S3 can only bucket 50 orphan files in a single call
@szehon-ho @dramaticlly @aokolnychyi also for this PR, I'll just focus on spark 3.3, and we can copy the implementation changes to other versions in separate PRs. |
Yeah for sure, I still need to add tests for this. |
.../v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java
Outdated
Show resolved
Hide resolved
c315669
to
e1a42d4
Compare
e1a42d4
to
6ed33d8
Compare
I've updated this PR to be based off my change here: https://github.com/apache/iceberg/pull/5379/files In the updated logic for the procedure, we will always use the delete func if it's specified. If a delete function is not specified and the file io does not support bulk operations we would use the default delete function. Otherwise the bulk delete is used by default. What this means is bulk operations will always be used (if the file io supports it) without considering user input. I was discussing with @dramaticlly perhaps we want more control over this and it should be specified in the action (something like useBulkDelete())? Also now we are delegating task management to the file IO, which I think makes sense but there's another argument that each procedure should control this since failure handling or retries would depend on the desired behavior for the procedure. What are peoples thoughts here? @dramaticlly @aokolnychyi @RussellSpitzer @karuppayya |
Thanks @amogh-jahagirdar , I was mostly thinking about this along the same line for #5412. In ideal case, we shall try to use Bulk operation if available to us (by inspecting the fileIO of given table, S3FileIO is the only supported one as of now ). On the other side, I think we need to maintain the backward compatibility, if some iceberg user already provide custom In my change, I added a new public method called
Personally I think delegate to fileIO for batching make sense to me, add additional parameter like batchSize in procedure call might cause confusion and hard to make it right. The downside I saw is about test hardness, wish for more inputs around it for test between classess. Current test class are all using Hadooptables and pick HiveTable with fileIO support SupportsBulkOperations seem to be overkill for unit tests |
I think that's a good simplification too, if the user wants customization then they can deal with a non-bulk (slower) one, but we throw an exception if they want a customized delete function AND bulk. |
6ed33d8
to
c223e9f
Compare
…efault if it's supported and a custom delete func is not passed in.
c223e9f
to
13c2414
Compare
I'm not sure about throwing an exception if a user specify delete function and if the file io supports bulk delete is the way to go, because then we're changing the behavior of the exposed deleteFunc API. I think if deleteFunc is set, the procedure continues to use that as a source of truth regardless of bulk delete support or not. If we throw an exception, that would mean user's code would need to get rewritten if it's using S3FileIO and running this procedure with custom delete. Let me know if i'm misunderstanding! That being said, now we are changing the behavior if they do not specify a delete func and if it supports bulk delete. This change is less intrusive because it changes internally how the procedure runs and is not really exposed to a user. Let me know what you think.
Yeah, this is the tricky part. My thinking on the testing is the following: 1.) We should validate at the procedure level that we are guaranteeing the existing behavior (failing at the expected points, using the parameters right etc). For the supports bulk delete case, I think we can accomplish hitting that case through a spy object? Although not particularly clean way. 2.) Then we should guarantee that the file IO which supports bulk delete (in this case, S3FileIO) has good test coverage to make sure it's deleteFiles works properly. If we do 1 and 2 we can get better confidence |
Maybe it was a bit of a misunderstanding, I was talking about the flag you are discussing with @dramaticlly (useBulkDelete). So I was thinking, if useBulkDelete is on && deleteFunc is set, then its a misconfiguration. But are we doing the flag? Or are you suggesting, is to have deleteFunc always take precedence, ie if (deleteFunc set), always use the single file deleteFunc. Otherwise, if FileIO supports bulkOperations, automatically use the bulk delete?
Whats the choice here, I suppose we will have to have an extra parameter on supportBulkOperations FileIOs to control retry, and this can be set by the various procedures? I think after #5379 it will be easy to implement , as S3FileIO can just pass that parameter to Tasks? |
Close as duplicate for #6682 |
In this change, the DeleteOrphanFiles procedure has been updated to perform bulk deletion in case a deletion batch size is set and is supported. If a batch size greater than 1 is set and the underlying FileIO does not support batch deletion, the procedure will fail.
There maybe are more sophisticated heuristics for determining if if bulk deletion should be used but for now it seems reasonable that it gets used when the FIleIO supports it and there is a batch size greater than 1.