Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Core: Delegate bulk and prefix operations in ResolvingFileIO #8169

Closed
wants to merge 16 commits into from

Conversation

bryanck
Copy link
Contributor

@bryanck bryanck commented Jul 28, 2023

This PR adds implementing the SupportsPrefixOperations and SupportsBulkOperations interfaces in ResolvingFileIO. Currently prefix and bulk operations of the delegate FileIO aren't accessible. In the Spark delete orphan files action, this prevents bulk deletes from being used with ResolvingFileIO, for example. Currently all of the supported delegate FileIO classes implement those interfaces.

@github-actions github-actions bot added the core label Jul 28, 2023

PeekingIterator<String> iterator = Iterators.peekingIterator(originalIterator);
FileIO fileIO = io(iterator.peek());
if (!(fileIO instanceof SupportsPrefixOperations)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We check prefix, but then throw an exception warning we don't support bulk

"FileIO doesn't support bulk operations: " + fileIO.getClass().getName());
}

((SupportsBulkOperations) fileIO).deleteFiles(() -> iterator);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We didn't check this cast

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We also should probably pass through the original iterable. Just in case this is a closable iterable.

@Override
public Iterable<FileInfo> listPrefix(String prefix) {
FileIO fileIO = io(prefix);
if (!(fileIO instanceof SupportsPrefixOperations)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This breaks if the delegate can't actually list prefix

Copy link
Member

@RussellSpitzer RussellSpitzer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs some tests but I think has some underlying issues. We can't expose that we support the various interfaces SupportsPrefix, SupportsBulk and then fail if the caller attempts to use them. If for example you were using a ResolvingFileIO and the underlying file IO didn't support prefix and bulk, this would break all usage of that IO.

When the underlying system doesn't support the interface we need to fall back to using the standard apis.

@bryanck
Copy link
Contributor Author

bryanck commented Aug 2, 2023

Part of the assumption was that all of the supported delegates already (or will) implement those interfaces (HadoopFileIO, GCSFileIO, and S3FileIO), and my thought was that a FileIO implementation would be required to support those before being added here. So currently there won't be any typecasting issues.

public class ResolvingFileIO implements FileIO, HadoopConfigurable {
/**
* FileIO implementation that uses location scheme to choose the correct FileIO implementation.
* Delegate FileIO implementations should support the mixin interfaces {@link
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we want to make this a requirement we should probably just add these as perquisites in the init code?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense to me, I'll add that.

@bryanck
Copy link
Contributor Author

bryanck commented Aug 3, 2023

One note, if this change ends up being acceptable, it shouldn't be merged before #8168, as that adds the now-mandatory mixin interfaces to GCSFileIO.

}

PeekingIterator<String> iterator = Iterators.peekingIterator(originalIterator);
FileIO fileIO = io(iterator.peek());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why don't we change the function io to return a FileIO which supportsPrefixOperations and BulkOperations if that's a requirement?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the suggestion, I made this change and removed the typecasts.


PeekingIterator<String> iterator = Iterators.peekingIterator(originalIterator);
SupportsBulkOperations fileIO = io(iterator.peek());
fileIO.deleteFiles(() -> iterator);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did we resolve this? I think we probably want to pass the iterable here rather than the iterator to make sure we close everything properly

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No not yet, I'm still thinking through the best solution. Passing the original iterable could cause an additional request to be made, i.e. one additional request for the peek, so I was hoping to avoid that.

String impl = implFromLocation(location);
FileIO io = ioInstances.get(impl);
T io = (T) ioInstances.get(impl);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add this to the map type as well? I always forget if Java can do this, I know Scala can

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I originally had it that way, but it required changing the class declaration and caused the revapi check to fail, I'm not sure what the tolerance is to make that change

Copy link
Member

@RussellSpitzer RussellSpitzer Aug 3, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well we are forcing that, so the revapi change is appropriate

Ie, Runtime casting is just letting us sneak past the RevAPI static check

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK great. I made this change and added the revapi exception

Copy link
Contributor Author

@bryanck bryanck Aug 4, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After further thought, this isn't a great solution as it allows declaring a ResolvingFileIO with an incompatible generic type, e,g, new ResolvingFileIO<MyType>(), so I'll need to revert this change.

Copy link
Contributor Author

@bryanck bryanck Aug 4, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't believe there is another way to declare the map as something like Map<String, FileIO & SupportsPrefixOperations & SupportsBulkOperations>

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could always define a small inner abstract class, DelgateIO extends ....

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, then update S3FileIO, GCSFileIO, and HadoopFileIO to implement that, which makes it more clear what you need to implement if developing a new delegate class. I'll make that change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll need to wait on the GCSFileIO PR to add prefix and bulk operations before that can implement this new interface, but we should wait for that anyway before merging this.

@@ -19,8 +19,16 @@
package org.apache.iceberg.io;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.AssertionsForClassTypes.assertThatCode;
import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe these imports are wrong and should be ...Assertions.assertThatThrownBy rather than AssertionsForClassTypes

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I fixed that

@github-actions github-actions bot added the AWS label Aug 4, 2023
@github-actions github-actions bot added the API label Aug 4, 2023

@Test
public void testEnsureInterfaceImplementation() {
ResolvingFileIO testResolvingFileIO = spy(new ResolvingFileIO());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is ok, but what I was hoping we would have coverage for was that all delegates properly worked. This may be very difficult, I'm just looking for a way to confirm that the class will work at runtime. If we can force all the checks at compile time I'd be ok with that as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We wouldn't be able to test the actual delegates without having the dependencies here (i.e. iceberg-aws and iceberg-gcp). I'll see if a service pattern could work where the delegate types register themselves with ResolvingFileIO, then we can have the compile time check.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I looked into using Java SPI, but I don't think it is a good idea as it could cause problems with shadow jars. Also it isn't much different than this solution. We can't register with ResolvingFileIO from the delegate class as the delegate class might not be loaded at all yet.

One option, somewhat similar to #7976 would be to remove SupportsPrefixOperations and then provide a default implementation for bulk delete for the orphan file case so the delegate doesn't need the mixin interface. Though I was hoping we could have the prefix operations also as they won't be accessible at all.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added tests for each of the delegate types to ensure they load. Though that doesn't prevent a new class from being added to the list that doesn't implement the interface.

@github-actions github-actions bot added the GCP label Aug 4, 2023
@bryanck
Copy link
Contributor Author

bryanck commented Aug 4, 2023

I'd like to close this PR in favor of #7976, which focuses on the immediate problem of orphan files not doing batch deletes. There are opportunities to improve FileIO resolution/delegation but this PR wasn't created with that fully in mind, so a new PR to address that would make more sense to me.

@bryanck
Copy link
Contributor Author

bryanck commented Aug 4, 2023

Apologies for taking up people's time on this, hopefully I or someone else will have an improved PR to address some of the issues raised.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants