-
Notifications
You must be signed in to change notification settings - Fork 2.7k
InputFormat support for Iceberg #843
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
I think @teabot they're also working on this, not sure how do we collaborate. Maybe we should create a milestone and break down all the tasks, so that we could parallelize all the works and reduce the conflicts. |
} | ||
} | ||
|
||
private Iterator<T> open(FileScanTask currentTask, Schema readSchema) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I read the pig & spark source reader, seems we could abstract the FileScanTask open(..) method with readSchema, so that all the compute engine can share that part of code, maybe we can defined it as:
Iterator<T> open(FileScanTask currentTask, Schema readSchema, ReadSupport<T> readSupport).
I think it could be a file-level abstraction ..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also have the same feelings, with more framework supported, there could be a bunch of duplicated codes. It would be better to have an abstract layer which could possibly be adapted to all the frameworks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pig will eventually rely on this InputFormat, but I agree generally, if we can abstract away common code, it would be great.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, there may be a good way to extract common code. For now, let's do that in a separate PR though to keep this focused on the MR support.
+1. I'll create those tasks. |
OK, so the One possible approach here would be that we move our code from the "hiveberg" repo back into a PR against Iceberg but we would need someone from the Iceberg project to help us with the dependency and version hell that this leads to (specifically Hive, Guava and Avro version conflicts). We could then have a The InputFormat is also only one part of the picture, we're also intending to add a SerDe (see https://github.com/ExpediaGroup/hiveberg/blob/master/src/main/java/com/expediagroup/hiveberg/IcebergSerDe.java) and related classes and ultimately potentially bundle this all into a Hive StorageHandler. This and other roadmap we have on mind can be found at https://github.com/ExpediaGroup/hiveberg/issues. Again, we would happily move all this work over to here and I think that would make it a lot easier to have visibility on all the issues and split the work to remove duplication etc. |
TableScan scan = table.newScan(); | ||
//TODO add caseSensitive, snapshot id etc.. | ||
|
||
Expression filterExpression = SerializationUtil.deserializeFromBase64(conf.get(FILTER_EXPRESSION)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like this serialization (java serializable/gzip/base64) is only used for the filter expression. We should consider (in a separate commit) using a parser for Iceberg expressions so that we can convert them to infix strings instead and parse them when needed. That makes the job configuration readable, which is always a good thing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could also serialize to/from JSON if we wanted something easier than including or building a parser.
Thanks @massdosage . I think we should be able to add Mrv1 support on top of Mrv2 . I had seen some libraries which provide out of the box support for converting MRv2 to Mrv1. I was unable to find that library now, but I vaguely recollect that maybe twitter had built something around this. |
Any suggestions on how we support the mapred API? |
Addressed some review comments.
|
Once folks are ok with the approach. I'll start writing tests |
I'm not so if there're other wrappers, but I think this should be fine. |
I did see long time back a better written converter from MRv2 -> Mrv1 I couldn't remember the name of the library now. |
I'll create a ticket for this |
I've raised a PR against @rdsr's branch which could be a first step at having both InputFormats, we could then look into factoring the commonality out (possibly using something like ElephantBird). Alternatively we raise a separate PR for the It would be good to decide on who is going to do what and in what order. For now we are working on the Hive Serde and looking into improving the Hive read path (other file formats, partition predicate and projected column pushdown etc.) |
Do we need to support the |
@Override | ||
public void initialize(InputSplit split, TaskAttemptContext newContext) { | ||
Configuration conf = newContext.getConfiguration(); | ||
CombinedScanTask task = ((IcebergSplit) split).task; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should note in documentation that this InputFormat
must perform its own split planning and doesn't accept FileSplit
instances. I think it's fairly common to create and pass file splits, so we may just want to accept them and treat them as single-file tasks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the splits are only created using Iceberg apis, I'd imagine we will only be getting IcebergSpits , no?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, if splits are produced using getSplits
then we'd be fine. I seem to remember some problem doing that with Hive for Parquet, though. We ended up using regular FileSplit
there.
@rdblue HiveStorageHandler https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java only supports the mapred API |
|
||
splits = Lists.newArrayList(); | ||
try (CloseableIterable<CombinedScanTask> tasksIterable = scan.planTasks()) { | ||
tasksIterable.forEach(task -> splits.add(new IcebergSplit(conf, task))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just realized that we also need to assert that the residual for each file in the task is either null
or Expressions.alwaysTrue
. That will guarantee that the filter expression is completely satisfied and no additional rows will be returned.
Alternatively, we could run the residual to filter every row in the reader, but it's probably simpler for now to open that as a follow-up issue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think Pig and Hive could apply the residuals on their own if we return the pushed down filters as is, like we do in Spark. For MR standalone, we can run the residual like we do for IcebergGenerics for now I do as you mentioned but only for MR standalone.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like that this applies the residuals for generics, but I think we need to be a bit safer for the others. I think we should make a config option to enable the current behavior with Pig and Hive object models. If it is set, then Pig and Hive are expected to apply the final filter on their own. If not, then we assert that the residual is null or alwaysTrue. I think it's much safer to make skipping residuals opt-in like that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added follow up for residual evaluation for Iceberg generics - #866
mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
Outdated
Show resolved
Hide resolved
} | ||
|
||
private static void checkResiduals(Configuration conf, CombinedScanTask task) { | ||
boolean platformAppliesFilter = conf.getBoolean(PLATFORM_APPLIES_FILTER_RESIDUALS, false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor: this calls getBoolean
and getEnum
for every split. It would be nice to get those outside the loop and pass them in.
task.files().forEach(fileScanTask -> { | ||
Expression residual = fileScanTask.residual(); | ||
if (residual != null && !residual.equals(Expressions.alwaysTrue())) { | ||
throw new RuntimeException( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about UnsupportedOperationException
since this will eventually be implemented?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, this would only be implemented for GENERIC data model, not for Pig and Hive. Thoughts on keeping the same exception or do u think we should implement residual eval for Pig and Hive data models?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My rationale was that we would accept a patch to add residual filtering for Hive and Pig, so why not consider it a future feature? Should someone choose to contribute it I think it would be valuable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK that make sense. Thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Kept as UnsupportedException
mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
Show resolved
Hide resolved
mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
Show resolved
Hide resolved
mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
Show resolved
Hide resolved
mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
Outdated
Show resolved
Hide resolved
mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
Outdated
Show resolved
Hide resolved
mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
Outdated
Show resolved
Hide resolved
mr/src/test/java/org/apache/iceberg/mr/mapreduce/TestIcebergInputFormat.java
Outdated
Show resolved
Hide resolved
} | ||
|
||
private static Record genericRecordWithPartitionsColumns( | ||
Record record, Schema identityPartitionSchema, PartitionSpec spec, StructLike partition) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are tests for this in Spark here: 6cafdab
You might consider adding some similar tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added similar tests. Had to change the logic of handling identity columns.
@rdsr, this looks almost ready to me. The main thing is to add the MR tests that use a mini-DFS and mini-MR cluster. |
@rdblue . Regarding mini-DFS and mini-MR. I'm facing class-path issues as explained here #843 (comment) . I've also outlined an approach on how to add a test using iceberg-mr-runtime and iceberg-mr-runtime-test. Let me know if the mini dfs can be addressed as a separate PR |
1aef1bd
to
7cb3dd0
Compare
Looks good! I merged this. Thanks for the contributions and reviews, everyone! |
This addresses #170
The current code is pretty rough. I'm sending to get early feedback on the approach.
Most of the scaffolding is there. There's support to allow for parameterized in memory records. All read value functions for data formats like Avro, Parquet and ORC are supported through the
ReadSupport
interface. The same interface also allows to add identity partition columns to the input row.I supported parameterized input records because much of the use of the InputFormat api is for supporting MR based engines like Pig and Hive.
I need feedback on the
ReadSupport
API and see whether folks are OK with it. Detailed code feedback can be omitted for now. Once people are ok the high level approach I will clean up the code and add tests. Feedback welcome on testing the input format as well.cc @rdblue @teabot @jerryshao @massdosage
This patch is based on the work of @guilload