-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Core: Delegate bulk and prefix operations in ResolvingFileIO #8169
Conversation
|
||
PeekingIterator<String> iterator = Iterators.peekingIterator(originalIterator); | ||
FileIO fileIO = io(iterator.peek()); | ||
if (!(fileIO instanceof SupportsPrefixOperations)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We check prefix, but then throw an exception warning we don't support bulk
"FileIO doesn't support bulk operations: " + fileIO.getClass().getName()); | ||
} | ||
|
||
((SupportsBulkOperations) fileIO).deleteFiles(() -> iterator); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We didn't check this cast
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We also should probably pass through the original iterable. Just in case this is a closable iterable.
@Override | ||
public Iterable<FileInfo> listPrefix(String prefix) { | ||
FileIO fileIO = io(prefix); | ||
if (!(fileIO instanceof SupportsPrefixOperations)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This breaks if the delegate can't actually list prefix
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This needs some tests but I think has some underlying issues. We can't expose that we support the various interfaces SupportsPrefix, SupportsBulk and then fail if the caller attempts to use them. If for example you were using a ResolvingFileIO and the underlying file IO didn't support prefix and bulk, this would break all usage of that IO.
When the underlying system doesn't support the interface we need to fall back to using the standard apis.
Part of the assumption was that all of the supported delegates already (or will) implement those interfaces (HadoopFileIO, GCSFileIO, and S3FileIO), and my thought was that a FileIO implementation would be required to support those before being added here. So currently there won't be any typecasting issues. |
public class ResolvingFileIO implements FileIO, HadoopConfigurable { | ||
/** | ||
* FileIO implementation that uses location scheme to choose the correct FileIO implementation. | ||
* Delegate FileIO implementations should support the mixin interfaces {@link |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we want to make this a requirement we should probably just add these as perquisites in the init code?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That makes sense to me, I'll add that.
One note, if this change ends up being acceptable, it shouldn't be merged before #8168, as that adds the now-mandatory mixin interfaces to |
} | ||
|
||
PeekingIterator<String> iterator = Iterators.peekingIterator(originalIterator); | ||
FileIO fileIO = io(iterator.peek()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why don't we change the function io to return a FileIO which supportsPrefixOperations and BulkOperations if that's a requirement?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the suggestion, I made this change and removed the typecasts.
|
||
PeekingIterator<String> iterator = Iterators.peekingIterator(originalIterator); | ||
SupportsBulkOperations fileIO = io(iterator.peek()); | ||
fileIO.deleteFiles(() -> iterator); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did we resolve this? I think we probably want to pass the iterable here rather than the iterator to make sure we close everything properly
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No not yet, I'm still thinking through the best solution. Passing the original iterable could cause an additional request to be made, i.e. one additional request for the peek, so I was hoping to avoid that.
String impl = implFromLocation(location); | ||
FileIO io = ioInstances.get(impl); | ||
T io = (T) ioInstances.get(impl); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add this to the map type as well? I always forget if Java can do this, I know Scala can
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I originally had it that way, but it required changing the class declaration and caused the revapi check to fail, I'm not sure what the tolerance is to make that change
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well we are forcing that, so the revapi change is appropriate
Ie, Runtime casting is just letting us sneak past the RevAPI static check
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK great. I made this change and added the revapi exception
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After further thought, this isn't a great solution as it allows declaring a ResolvingFileIO with an incompatible generic type, e,g, new ResolvingFileIO<MyType>()
, so I'll need to revert this change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't believe there is another way to declare the map as something like Map<String, FileIO & SupportsPrefixOperations & SupportsBulkOperations>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You could always define a small inner abstract class, DelgateIO extends ....
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, then update S3FileIO, GCSFileIO, and HadoopFileIO to implement that, which makes it more clear what you need to implement if developing a new delegate class. I'll make that change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll need to wait on the GCSFileIO PR to add prefix and bulk operations before that can implement this new interface, but we should wait for that anyway before merging this.
@@ -19,8 +19,16 @@ | |||
package org.apache.iceberg.io; | |||
|
|||
import static org.assertj.core.api.Assertions.assertThat; | |||
import static org.assertj.core.api.AssertionsForClassTypes.assertThatCode; | |||
import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe these imports are wrong and should be ...Assertions.assertThatThrownBy
rather than AssertionsForClassTypes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, I fixed that
|
||
@Test | ||
public void testEnsureInterfaceImplementation() { | ||
ResolvingFileIO testResolvingFileIO = spy(new ResolvingFileIO()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is ok, but what I was hoping we would have coverage for was that all delegates properly worked. This may be very difficult, I'm just looking for a way to confirm that the class will work at runtime. If we can force all the checks at compile time I'd be ok with that as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We wouldn't be able to test the actual delegates without having the dependencies here (i.e. iceberg-aws and iceberg-gcp). I'll see if a service pattern could work where the delegate types register themselves with ResolvingFileIO, then we can have the compile time check.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I looked into using Java SPI, but I don't think it is a good idea as it could cause problems with shadow jars. Also it isn't much different than this solution. We can't register with ResolvingFileIO from the delegate class as the delegate class might not be loaded at all yet.
One option, somewhat similar to #7976 would be to remove SupportsPrefixOperations
and then provide a default implementation for bulk delete for the orphan file case so the delegate doesn't need the mixin interface. Though I was hoping we could have the prefix operations also as they won't be accessible at all.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added tests for each of the delegate types to ensure they load. Though that doesn't prevent a new class from being added to the list that doesn't implement the interface.
I'd like to close this PR in favor of #7976, which focuses on the immediate problem of orphan files not doing batch deletes. There are opportunities to improve FileIO resolution/delegation but this PR wasn't created with that fully in mind, so a new PR to address that would make more sense to me. |
Apologies for taking up people's time on this, hopefully I or someone else will have an improved PR to address some of the issues raised. |
This PR adds implementing the
SupportsPrefixOperations
andSupportsBulkOperations
interfaces inResolvingFileIO
. Currently prefix and bulk operations of the delegate FileIO aren't accessible. In the Spark delete orphan files action, this prevents bulk deletes from being used with ResolvingFileIO, for example. Currently all of the supported delegate FileIO classes implement those interfaces.