Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

API, Spark: Update remove orphan files procedure to use bulk deletion if applicable #5373

Conversation

amogh-jahagirdar
Copy link
Contributor

@amogh-jahagirdar amogh-jahagirdar commented Jul 28, 2022

In this change, the DeleteOrphanFiles procedure has been updated to perform bulk deletion in case a deletion batch size is set and is supported. If a batch size greater than 1 is set and the underlying FileIO does not support batch deletion, the procedure will fail.

There maybe are more sophisticated heuristics for determining if if bulk deletion should be used but for now it seems reasonable that it gets used when the FIleIO supports it and there is a batch size greater than 1.

@amogh-jahagirdar amogh-jahagirdar force-pushed the remove-orphan-files-deletion-batch-size branch 4 times, most recently from 702b92f to 31cf564 Compare July 28, 2022 05:09
@amogh-jahagirdar amogh-jahagirdar changed the title API, Spark: Update remove orphan files procedure for performing batch deletion if it's supported and a batch deletion size greater than 1 API, Spark: Update remove orphan files procedure to use batch deletion if applicable Jul 28, 2022
@amogh-jahagirdar amogh-jahagirdar force-pushed the remove-orphan-files-deletion-batch-size branch 5 times, most recently from efb7fe1 to eadd945 Compare July 28, 2022 05:32
@amogh-jahagirdar amogh-jahagirdar marked this pull request as ready for review July 28, 2022 05:33
@amogh-jahagirdar amogh-jahagirdar force-pushed the remove-orphan-files-deletion-batch-size branch 2 times, most recently from d94843c to a0afe6e Compare July 28, 2022 05:50
@amogh-jahagirdar
Copy link
Contributor Author

amogh-jahagirdar commented Jul 28, 2022

Still figuring out a good way to write tests for this, but in the interim @RussellSpitzer @aokolnychyi @karuppayya @singhpk234 @jackye1995 would like to get your feedback!

@amogh-jahagirdar amogh-jahagirdar force-pushed the remove-orphan-files-deletion-batch-size branch from a0afe6e to c315669 Compare July 28, 2022 06:53
@amogh-jahagirdar amogh-jahagirdar changed the title API, Spark: Update remove orphan files procedure to use batch deletion if applicable API, Spark: Update remove orphan files procedure to use bulk deletion if applicable Jul 28, 2022
Comment on lines 195 to 214
if (batchDeletionSize > 1) {
Preconditions.checkArgument(
table.io() instanceof SupportsBulkOperations,
"FileIO %s does not support bulk deletion",
table.io().getClass().getName());
SupportsBulkOperations bulkFileIO = (SupportsBulkOperations) table.io();
List<List<String>> fileBatches = Lists.partition(orphanFiles, batchDeletionSize);
Tasks.foreach(fileBatches)
.noRetry()
.executeWith(deleteExecutorService)
.suppressFailureWhenFinished()
.run(bulkFileIO::deleteFiles);
} else {
Tasks.foreach(orphanFiles)
.noRetry()
.executeWith(deleteExecutorService)
.suppressFailureWhenFinished()
.onFailure((file, exc) -> LOG.warn("Failed to delete file: {}", file, exc))
.run(deleteFunc::accept);
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can probably be abstracted away into a single method which delegates to the right approach but I didn't want to introduce more indirection or expose public methods unnecessarily until we know for sure we want them.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, having a helper / wrapper to wrap this if ..else should be a good one to have, and can help in adding this change more places !

[Not in scope of pr] Any thoughts on ResolvingFileIO which is a wrapper on other fileIO's.

"FileIO %s does not support bulk deletion",
table.io().getClass().getName());
SupportsBulkOperations bulkFileIO = (SupportsBulkOperations) table.io();
List<List<String>> fileBatches = Lists.partition(orphanFiles, batchDeletionSize);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm thinking we leave the batching logic to the fileIO. Since the pattern being followed for more advanced file io interaction is to have mixin interfaces, this lets us delegate more to fileIO with "special" capabilities. So I think it makes sense to remove batching from the action itself and just let the fileIO take care of it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if I understand correctly, this batchDeletionSize determine how long of a list can be passed to fileIO batch deletion, so in order for fileIO to handle I guess this would be set to larger than expected one?

Say if we set batchDeletionSize = 50 in remove-orphan call and S3FILEIO_DELETE_BATCH_SIZE_DEFAULT = 250, the S3 can only bucket 50 orphan files in a single call

.noRetry()
.executeWith(deleteExecutorService)
.suppressFailureWhenFinished()
.run(bulkFileIO::deleteFiles);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was looking a bit with @dramaticlly at this earlier. My concern here is the existing 'deleteFunc' plugin mechanism, which I know some users are using for dry-run purpose, ie printing out the list of files to delete instead of actually deleting them. Im thinking:

  • do a precondition check, if the deleteFunc != defaultDeleteFunc. If so, then dont allow bulkDelete
  • add a bulk deleteFunc?
    @aokolnychyi any thoughts?

Copy link
Contributor Author

@amogh-jahagirdar amogh-jahagirdar Aug 1, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey Szeon, I'm going to update this PR based on my change in https://github.com/apache/iceberg/pull/5379/files.

My thinking is we should always just use deleteFunc if it's passed in (should be source of truth for deletion if it's set). If it's not and in the case that the file io supports bulk operation we can just do fileIO.deleteFiles(), otherwise we go back to the existing mechanism.

That way we preserve existing behavior for the procedure and have the additional optimization for file io types which support bulk delete. Also the underlying fileIO can take care of how the batches are created (so it's optimal for the underlying storage, in the sense of maximizing throughput of the deletion, minimizing throttling, etc). The underlying fileIO can also handle parallelism as its desire. So in the procedure level we don't need to pass in a batch size.

Copy link
Contributor

@dramaticlly dramaticlly left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, I was thinking about adding one for expire snapshots as well. But looking at this change, I noticed there's no test coverage included, can we add any unit tests for this ?

"FileIO %s does not support bulk deletion",
table.io().getClass().getName());
SupportsBulkOperations bulkFileIO = (SupportsBulkOperations) table.io();
List<List<String>> fileBatches = Lists.partition(orphanFiles, batchDeletionSize);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if I understand correctly, this batchDeletionSize determine how long of a list can be passed to fileIO batch deletion, so in order for fileIO to handle I guess this would be set to larger than expected one?

Say if we set batchDeletionSize = 50 in remove-orphan call and S3FILEIO_DELETE_BATCH_SIZE_DEFAULT = 250, the S3 can only bucket 50 orphan files in a single call

@amogh-jahagirdar
Copy link
Contributor Author

@szehon-ho @dramaticlly @aokolnychyi also for this PR, I'll just focus on spark 3.3, and we can copy the implementation changes to other versions in separate PRs.

@amogh-jahagirdar
Copy link
Contributor Author

LGTM, I was thinking about adding one for expire snapshots as well. But looking at this change, I noticed there's no test coverage included, can we add any unit tests for this ?

Yeah for sure, I still need to add tests for this.

@amogh-jahagirdar amogh-jahagirdar force-pushed the remove-orphan-files-deletion-batch-size branch from c315669 to e1a42d4 Compare August 1, 2022 20:50
@github-actions github-actions bot added the AWS label Aug 1, 2022
@amogh-jahagirdar amogh-jahagirdar force-pushed the remove-orphan-files-deletion-batch-size branch from e1a42d4 to 6ed33d8 Compare August 1, 2022 20:51
@amogh-jahagirdar
Copy link
Contributor Author

amogh-jahagirdar commented Aug 1, 2022

I've updated this PR to be based off my change here: https://github.com/apache/iceberg/pull/5379/files
That change should go in first (if we decide it's the right way to go) and then this PR can just focus on the integration with the procedure.

In the updated logic for the procedure, we will always use the delete func if it's specified.

If a delete function is not specified and the file io does not support bulk operations we would use the default delete function. Otherwise the bulk delete is used by default.

What this means is bulk operations will always be used (if the file io supports it) without considering user input. I was discussing with @dramaticlly perhaps we want more control over this and it should be specified in the action (something like useBulkDelete())?

Also now we are delegating task management to the file IO, which I think makes sense but there's another argument that each procedure should control this since failure handling or retries would depend on the desired behavior for the procedure. What are peoples thoughts here? @dramaticlly @aokolnychyi @RussellSpitzer @karuppayya

@dramaticlly
Copy link
Contributor

In the updated logic for the procedure, we will always use the delete func if it's specified.

If a delete function is not specified and the file io does not support bulk operations we would use the default delete function. Otherwise the bulk delete is used by default.

What this means is bulk operations will always be used (if the file io supports it) without considering user input. I was discussing with @dramaticlly perhaps we want more control over this and it should be specified in the action (something like useBulkDelete())?

Thanks @amogh-jahagirdar , I was mostly thinking about this along the same line for #5412. In ideal case, we shall try to use Bulk operation if available to us (by inspecting the fileIO of given table, S3FileIO is the only supported one as of now ). On the other side, I think we need to maintain the backward compatibility, if some iceberg user already provide custom deleteFunc to the SparkAction in the past, everything shall work as expected.

In my change, I added a new public method called bulkDeleteFunc to allow overrides if needed, this provides a way for customization even for tables on S3fileIO, or it can be used to disable the Bulk deletion for some troubleshooting and debugging needs.

Also now we are delegating task management to the file IO, which I think makes sense but there's another argument that each procedure should control this since failure handling or retries would depend on the desired behavior for the procedure. What are peoples thoughts here?

Personally I think delegate to fileIO for batching make sense to me, add additional parameter like batchSize in procedure call might cause confusion and hard to make it right. The downside I saw is about test hardness, wish for more inputs around it for test between classess. Current test class are all using Hadooptables and pick HiveTable with fileIO support SupportsBulkOperations seem to be overkill for unit tests

@szehon-ho
Copy link
Collaborator

If a delete function is not specified and the file io does not support bulk operations we would use the default delete function. Otherwise the bulk delete is used by default.

I think that's a good simplification too, if the user wants customization then they can deal with a non-bulk (slower) one, but we throw an exception if they want a customized delete function AND bulk.

@amogh-jahagirdar amogh-jahagirdar force-pushed the remove-orphan-files-deletion-batch-size branch from 6ed33d8 to c223e9f Compare August 2, 2022 03:07
…efault if it's supported and a custom delete func is not passed in.
@amogh-jahagirdar amogh-jahagirdar force-pushed the remove-orphan-files-deletion-batch-size branch from c223e9f to 13c2414 Compare August 2, 2022 03:14
@amogh-jahagirdar
Copy link
Contributor Author

amogh-jahagirdar commented Aug 2, 2022

but we throw an exception if they want a customized delete function AND bulk.

I'm not sure about throwing an exception if a user specify delete function and if the file io supports bulk delete is the way to go, because then we're changing the behavior of the exposed deleteFunc API. I think if deleteFunc is set, the procedure continues to use that as a source of truth regardless of bulk delete support or not. If we throw an exception, that would mean user's code would need to get rewritten if it's using S3FileIO and running this procedure with custom delete. Let me know if i'm misunderstanding!

That being said, now we are changing the behavior if they do not specify a delete func and if it supports bulk delete. This change is less intrusive because it changes internally how the procedure runs and is not really exposed to a user. Let me know what you think.

The downside I saw is about test hardness, wish for more inputs around it for test between classes. Current test class are all using Hadooptables and pick HiveTable with fileIO support SupportsBulkOperations seem to be overkill for unit tests

Yeah, this is the tricky part. My thinking on the testing is the following:

1.) We should validate at the procedure level that we are guaranteeing the existing behavior (failing at the expected points, using the parameters right etc). For the supports bulk delete case, I think we can accomplish hitting that case through a spy object? Although not particularly clean way.

2.) Then we should guarantee that the file IO which supports bulk delete (in this case, S3FileIO) has good test coverage to make sure it's deleteFiles works properly.

If we do 1 and 2 we can get better confidence

@szehon-ho
Copy link
Collaborator

szehon-ho commented Aug 2, 2022

I'm not sure about throwing an exception if a user specify delete function and if the file io supports bulk delete is the way to go, because then we're changing the behavior of the exposed deleteFunc API. I think if deleteFunc is set, the procedure continues to use that as a source of truth regardless of bulk delete support or not. If we throw an exception, that would mean user's code would need to get rewritten if it's using S3FileIO and running this procedure with custom delete. Let me know if i'm misunderstanding!

That being said, now we are changing the behavior if they do not specify a delete func and if it supports bulk delete. This change is less intrusive because it changes internally how the procedure runs and is not really exposed to a user. Let me know what you think.

Maybe it was a bit of a misunderstanding, I was talking about the flag you are discussing with @dramaticlly (useBulkDelete). So I was thinking, if useBulkDelete is on && deleteFunc is set, then its a misconfiguration.

But are we doing the flag? Or are you suggesting, is to have deleteFunc always take precedence, ie if (deleteFunc set), always use the single file deleteFunc. Otherwise, if FileIO supports bulkOperations, automatically use the bulk delete?

Also now we are delegating task management to the file IO, which I think makes sense but there's another argument that each procedure should control this since failure handling or retries would depend on the desired behavior for the procedure. What are peoples thoughts here? @dramaticlly @aokolnychyi @RussellSpitzer @karuppayya

Whats the choice here, I suppose we will have to have an extra parameter on supportBulkOperations FileIOs to control retry, and this can be set by the various procedures? I think after #5379 it will be easy to implement , as S3FileIO can just pass that parameter to Tasks?

@jackye1995
Copy link
Contributor

Close as duplicate for #6682

@jackye1995 jackye1995 closed this Mar 10, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants