Skip to content

InputFormat support for Iceberg #843

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Apr 6, 2020
Merged

InputFormat support for Iceberg #843

merged 8 commits into from
Apr 6, 2020

Conversation

rdsr
Copy link
Contributor

@rdsr rdsr commented Mar 16, 2020

This addresses #170
The current code is pretty rough. I'm sending to get early feedback on the approach.
Most of the scaffolding is there. There's support to allow for parameterized in memory records. All read value functions for data formats like Avro, Parquet and ORC are supported through the ReadSupport interface. The same interface also allows to add identity partition columns to the input row.
I supported parameterized input records because much of the use of the InputFormat api is for supporting MR based engines like Pig and Hive.

I need feedback on the ReadSupport API and see whether folks are OK with it. Detailed code feedback can be omitted for now. Once people are ok the high level approach I will clean up the code and add tests. Feedback welcome on testing the input format as well.

cc @rdblue @teabot @jerryshao @massdosage

This patch is based on the work of @guilload

@jerryshao
Copy link
Contributor

I think @teabot they're also working on this, not sure how do we collaborate. Maybe we should create a milestone and break down all the tasks, so that we could parallelize all the works and reduce the conflicts.

}
}

private Iterator<T> open(FileScanTask currentTask, Schema readSchema) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I read the pig & spark source reader, seems we could abstract the FileScanTask open(..) method with readSchema, so that all the compute engine can share that part of code, maybe we can defined it as:

Iterator<T> open(FileScanTask currentTask, Schema readSchema, ReadSupport<T> readSupport).

I think it could be a file-level abstraction ..

Copy link
Contributor

@jerryshao jerryshao Mar 16, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also have the same feelings, with more framework supported, there could be a bunch of duplicated codes. It would be better to have an abstract layer which could possibly be adapted to all the frameworks.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pig will eventually rely on this InputFormat, but I agree generally, if we can abstract away common code, it would be great.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, there may be a good way to extract common code. For now, let's do that in a separate PR though to keep this focused on the MR support.

@rdsr
Copy link
Contributor Author

rdsr commented Mar 16, 2020

I think @teabot they're also working on this, not sure how do we collaborate. Maybe we should create a milestone and break down all the tasks, so that we could parallelize all the works and reduce the conflicts.

+1. I'll create those tasks.

@massdosage
Copy link
Contributor

OK, so the InputFormat we've been working on (https://github.com/ExpediaGroup/hiveberg/blob/master/src/main/java/com/expediagroup/hiveberg/IcebergInputFormat.java) implements the org.apache.hadoop.mapred.InputFormat API. I see the one above implements the org.apache.hadoop.mapreduce.InputFormat API. Which version of Hive is the latter targetting? I'm pretty sure it won't work on Hive 2.x as we tried that first and then moved to the mapred API which we can confirm is working for a simple read path (and we also have a Hive unit test for this at https://github.com/ExpediaGroup/hiveberg/blob/master/src/test/java/com/expediagroup/hiveberg/TestIcebergInputFormat.java). See https://stackoverflow.com/questions/33235199/hadoop-hive-serde-input-format-must-implement-inputformat for some more information on the two APIs.

One possible approach here would be that we move our code from the "hiveberg" repo back into a PR against Iceberg but we would need someone from the Iceberg project to help us with the dependency and version hell that this leads to (specifically Hive, Guava and Avro version conflicts). We could then have a mapred input format and a mapreduce input format and at that point refactor out the commonality from them.

The InputFormat is also only one part of the picture, we're also intending to add a SerDe (see https://github.com/ExpediaGroup/hiveberg/blob/master/src/main/java/com/expediagroup/hiveberg/IcebergSerDe.java) and related classes and ultimately potentially bundle this all into a Hive StorageHandler. This and other roadmap we have on mind can be found at https://github.com/ExpediaGroup/hiveberg/issues.

Again, we would happily move all this work over to here and I think that would make it a lot easier to have visibility on all the issues and split the work to remove duplication etc.

TableScan scan = table.newScan();
//TODO add caseSensitive, snapshot id etc..

Expression filterExpression = SerializationUtil.deserializeFromBase64(conf.get(FILTER_EXPRESSION));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like this serialization (java serializable/gzip/base64) is only used for the filter expression. We should consider (in a separate commit) using a parser for Iceberg expressions so that we can convert them to infix strings instead and parse them when needed. That makes the job configuration readable, which is always a good thing.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could also serialize to/from JSON if we wanted something easier than including or building a parser.

@rdsr
Copy link
Contributor Author

rdsr commented Mar 16, 2020

Thanks @massdosage .
This implements the MRv2 InputFormat api. Since Pig implements MRv2 and it is the relatively newer api when compared to Mrv1 I chose to implement this.

I think we should be able to add Mrv1 support on top of Mrv2 . I had seen some libraries which provide out of the box support for converting MRv2 to Mrv1. I was unable to find that library now, but I vaguely recollect that maybe twitter had built something around this.

@rdsr
Copy link
Contributor Author

rdsr commented Mar 18, 2020

@rdsr
Copy link
Contributor Author

rdsr commented Mar 18, 2020

Addressed some review comments.
Specifically around having a higher level way of specifying in memory data model.
If folks can eye ball the PR , that will be great. Three main things I need feedback on

  1. In Memory data model approach
  2. Logic for handling identity partition columns
  3. How we do address mapred inputformat as described above.

@rdsr
Copy link
Contributor Author

rdsr commented Mar 18, 2020

Once folks are ok with the approach. I'll start writing tests

@jerryshao
Copy link
Contributor

Any suggestions on how we support the mapred API?
We can go the route of https://github.com/twitter/elephant-bird/blob/master/core/src/main/java/com/twitter/elephantbird/mapred/input/DeprecatedInputFormatWrapper.java.
Any other ideas?

I'm not so if there're other wrappers, but I think this should be fine.

@rdsr
Copy link
Contributor Author

rdsr commented Mar 18, 2020

I'm not so if there're other wrappers, but I think this should be fine.

I did see long time back a better written converter from MRv2 -> Mrv1 I couldn't remember the name of the library now.
I guess if we go the route of the converter, we'd have to copy paste that code instead of adding that dependency which could bring in a lot of unneeded artifacts.

@rdsr
Copy link
Contributor Author

rdsr commented Mar 18, 2020

Any suggestions on how we support the mapred API?
We can go the route of https://github.com/twitter/elephant-bird/blob/master/core/src/main/java/com/twitter/elephantbird/mapred/input/DeprecatedInputFormatWrapper.java.
Any other ideas?
@rdblue , @massdosage, @jerryshao

I'll create a ticket for this

@massdosage
Copy link
Contributor

I've raised a PR against @rdsr's branch which could be a first step at having both InputFormats, we could then look into factoring the commonality out (possibly using something like ElephantBird). Alternatively we raise a separate PR for the mapreduce InputFormat and do the refactoring of commonality later.

It would be good to decide on who is going to do what and in what order. For now we are working on the Hive Serde and looking into improving the Hive read path (other file formats, partition predicate and projected column pushdown etc.)

@rdblue
Copy link
Contributor

rdblue commented Mar 18, 2020

Do we need to support the mapred format? If the intent is to support Pig, Hive, and MR applications then isn't a mapreduce format sufficient?

@Override
public void initialize(InputSplit split, TaskAttemptContext newContext) {
Configuration conf = newContext.getConfiguration();
CombinedScanTask task = ((IcebergSplit) split).task;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should note in documentation that this InputFormat must perform its own split planning and doesn't accept FileSplit instances. I think it's fairly common to create and pass file splits, so we may just want to accept them and treat them as single-file tasks.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the splits are only created using Iceberg apis, I'd imagine we will only be getting IcebergSpits , no?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, if splits are produced using getSplits then we'd be fine. I seem to remember some problem doing that with Hive for Parquet, though. We ended up using regular FileSplit there.

@rdsr
Copy link
Contributor Author

rdsr commented Mar 18, 2020


splits = Lists.newArrayList();
try (CloseableIterable<CombinedScanTask> tasksIterable = scan.planTasks()) {
tasksIterable.forEach(task -> splits.add(new IcebergSplit(conf, task)));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just realized that we also need to assert that the residual for each file in the task is either null or Expressions.alwaysTrue. That will guarantee that the filter expression is completely satisfied and no additional rows will be returned.

Alternatively, we could run the residual to filter every row in the reader, but it's probably simpler for now to open that as a follow-up issue.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think Pig and Hive could apply the residuals on their own if we return the pushed down filters as is, like we do in Spark. For MR standalone, we can run the residual like we do for IcebergGenerics for now I do as you mentioned but only for MR standalone.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like that this applies the residuals for generics, but I think we need to be a bit safer for the others. I think we should make a config option to enable the current behavior with Pig and Hive object models. If it is set, then Pig and Hive are expected to apply the final filter on their own. If not, then we assert that the residual is null or alwaysTrue. I think it's much safer to make skipping residuals opt-in like that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

Copy link
Contributor Author

@rdsr rdsr Mar 24, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added follow up for residual evaluation for Iceberg generics - #866

}

private static void checkResiduals(Configuration conf, CombinedScanTask task) {
boolean platformAppliesFilter = conf.getBoolean(PLATFORM_APPLIES_FILTER_RESIDUALS, false);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: this calls getBoolean and getEnum for every split. It would be nice to get those outside the loop and pass them in.

task.files().forEach(fileScanTask -> {
Expression residual = fileScanTask.residual();
if (residual != null && !residual.equals(Expressions.alwaysTrue())) {
throw new RuntimeException(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about UnsupportedOperationException since this will eventually be implemented?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, this would only be implemented for GENERIC data model, not for Pig and Hive. Thoughts on keeping the same exception or do u think we should implement residual eval for Pig and Hive data models?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My rationale was that we would accept a patch to add residual filtering for Hive and Pig, so why not consider it a future feature? Should someone choose to contribute it I think it would be valuable.

Copy link
Contributor Author

@rdsr rdsr Apr 4, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK that make sense. Thanks!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Kept as UnsupportedException

}

private static Record genericRecordWithPartitionsColumns(
Record record, Schema identityPartitionSchema, PartitionSpec spec, StructLike partition) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are tests for this in Spark here: 6cafdab

You might consider adding some similar tests.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added similar tests. Had to change the logic of handling identity columns.

@rdblue
Copy link
Contributor

rdblue commented Apr 3, 2020

@rdsr, this looks almost ready to me. The main thing is to add the MR tests that use a mini-DFS and mini-MR cluster.

@rdsr
Copy link
Contributor Author

rdsr commented Apr 3, 2020

@rdblue . Regarding mini-DFS and mini-MR. I'm facing class-path issues as explained here #843 (comment) . I've also outlined an approach on how to add a test using iceberg-mr-runtime and iceberg-mr-runtime-test. Let me know if the mini dfs can be addressed as a separate PR

@rdsr rdsr force-pushed the mr_generic branch 4 times, most recently from 1aef1bd to 7cb3dd0 Compare April 6, 2020 00:15
@rdblue
Copy link
Contributor

rdblue commented Apr 6, 2020

Looks good! I merged this. Thanks for the contributions and reviews, everyone!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants