Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP - Add an api for Parallelizing Manifest Reading in ManifestGroup #1420

Conversation

RussellSpitzer
Copy link
Member

@RussellSpitzer RussellSpitzer commented Sep 3, 2020

This is one of two WIP PR's to demonstrate some approaches to parallelizing the job planning phase of Spark Reads.

The other is located at #1421

To add distribtued manifest reading to table scans, we allow for a ManifestProcessor
to be used in a TableScan. This class is utilzied when ManifestGroup processes
Manifest files. The default implementation mimics the current code by just wrapping
the reading code in a CloseableIterable. A distributed Spark implementation is also
provided which reads all of the manifests remotely before returning valid entries
for further processing.

To add distribtued manifest reading to table scans, we allow for a ManifestProcessor
to be used in a TableScan. This class is utilzied when ManifestGroup processes
Manifest files. The default implementation mimics the current code by just wrapping
the reading code in a CloseableIterable. A distributed Spark implementation is also
provided which reads all of the manifests remotely before returning valid entries
for further processing.
* @param <T> The ManifestEntry Type being read from Manifest files
*/
public interface Func<T extends ContentFile<T>> extends BiFunction<ManifestFile, FileIO,
CloseableIterable<ManifestEntry<T>>>, Serializable {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know this is a WIP, so I'm partially commenting to follow along.

But I'm not a huge fan of this interface being named Func. Its usage also feels somewhat clunky to me to be honest, but I'm more of a scala developer than a java developer by day so that could possibly just be my own bias showing.

Overall this is shaping up to be a great addition though.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Check out the other PR too :) And if you have another approach I'd be glad to look into that as well. This is in my opinion definitely a more kludgy approach in Java. The only reason we need this interface is because otherwise you have to do
(All of that type info) lambda, this is basically a type alias java will use to covert the lambda into a real function.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah. Ok. Thank you so much! As I've mentioned, in my day job I'm more of a scala developer so I'm much more used to working with all of the well enriched type stuff that's available there. Thankfully I've finally stood all of this up on my own K8s cluster somewhere so I can continue to get more experienced with practical usage of Iceberg (especially between query systems).

When you explain it that way though, it makes perfect sense. :)

As for the approach being kludgy, if it gets the job done / speeds up manifest reading and makes things like exception stack traces etc more readable than a very large number of lambda functions, I'm not necessarily opposed at all. As a person that spends a lot of time in their current position as a sort of developer advocate helping people with the more esoteric parts of Spark and Flink etc, I can absolutely get behind more simplified type info and less concern with the serializability of functions, especially if the lambdas get raised to real functions so that they have more readable stack traces. :)

I'll be sure to check out the other approach. And if I can come up with any approaches or modifications to this approach that might be cleaner, I'll definitely let you know. I think that java doc comment on the interface is likely sufficient to explain Func. It's possible that maybe the relatively generic name Func is what tripped me up, but I can't think of a better name that doesn't conflict with other current Iceberg concepts (e.g. Transformers, etc).

* @param <T> The ManifestEntry Type being read from Manifest files
*/
public interface Func<T extends ContentFile<T>> extends BiFunction<ManifestFile, FileIO,
CloseableIterable<ManifestEntry<T>>>, Serializable {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah. Ok. Thank you so much! As I've mentioned, in my day job I'm more of a scala developer so I'm much more used to working with all of the well enriched type stuff that's available there. Thankfully I've finally stood all of this up on my own K8s cluster somewhere so I can continue to get more experienced with practical usage of Iceberg (especially between query systems).

When you explain it that way though, it makes perfect sense. :)

As for the approach being kludgy, if it gets the job done / speeds up manifest reading and makes things like exception stack traces etc more readable than a very large number of lambda functions, I'm not necessarily opposed at all. As a person that spends a lot of time in their current position as a sort of developer advocate helping people with the more esoteric parts of Spark and Flink etc, I can absolutely get behind more simplified type info and less concern with the serializability of functions, especially if the lambdas get raised to real functions so that they have more readable stack traces. :)

I'll be sure to check out the other approach. And if I can come up with any approaches or modifications to this approach that might be cleaner, I'll definitely let you know. I think that java doc comment on the interface is likely sufficient to explain Func. It's possible that maybe the relatively generic name Func is what tripped me up, but I can't think of a better name that doesn't conflict with other current Iceberg concepts (e.g. Transformers, etc).

/**
* Doc doc doc
*/
TableScan withManifestProcessor(ManifestProcessor processor);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see what you did here to get past the linter ;-)

If we decide to use this approach, let's be sure to not merge in Doc doc doc but instead a more descriptive comment or even a TODO comment. However since this is a WIP it's not worth your time to explain a pretty self documenting method name for the purpose of appeasing the linter. Leaving this comment so we don't forget to update if we do merge this in. :)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Of course :)

* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did this not match the rest of the files in the project? I'd prefer not to reformat headers in a different PR if we can avoid it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My understanding is that this is just a proof of concept PR, to compare with the other idea demonstrated here: #1421

I think once one is decided upon / if either of the versions are decided upon, then @RussellSpitzer will be updating it to be properly formatted etc. Please feel free to correct me if I'm wrong though Russell 🙂

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep sorry, I was just trying to get to a prototype real fast here.

@@ -25,7 +20,7 @@
import static org.apache.iceberg.types.Types.NestedField.optional;
import static org.apache.iceberg.types.Types.NestedField.required;

interface ManifestEntry<F extends ContentFile<F>> {
public interface ManifestEntry<F extends ContentFile<F>> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We've really tried to avoid making ManifestEntry public because exposing it makes the API much harder to use.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you expand on why making ManifestEntry public makes the API harder to use? Is it just because we then have to account for the fact that this is a public API?

I personally am more fond of this proof of concept as opposed to the other one, so I'd love to get insight into your comment here.

If it's just because we would then have to treat this API public, is it possible something could be done to mark it as private to iceberg, equivalent to scala's private[iceberg]?

@@ -131,6 +133,9 @@ public TableScan useSnapshot(long scanSnapshotId) {
ops, table, schema, context.useSnapshotId(scanSnapshotId));
}




Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: unnecessary whitespace change

@RussellSpitzer
Copy link
Member Author

We decided to go with the other approach, Goodnight sweet prince

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants