-
Notifications
You must be signed in to change notification settings - Fork 2.7k
Fix Iceberg Reader for nested partitions (#575) #585
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix Iceberg Reader for nested partitions (#575) #585
Conversation
HadoopTables tables = new HadoopTables(new Configuration()); | ||
PartitionSpec spec = PartitionSpec.builderFor(nestedSchema) | ||
.identity("id") | ||
.identity("nestedData.moreData") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm wondering whether we should also fix for partition field names and not allow dots in them. Partition spec schema is used in DataFile
schema which is used by the ManifestReader
.
Iceberg schemas support indexByName
api which maps a source column name to its field id. The column name is canonical, starting from root and containing all parent field names separated by dot.
@rdblue do you think this could somehow break the Schema indexbyName
api?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This dot issue in column names is fixed on the writer side by:
- Issue: After writing Iceberg dataset with nested partitions it cannot be read anymore #575
- PR: Support parsing of special characters in TypeToSchema visitor #220
I would say that to be consistent we should unify this on both writer and reader - that is what this PR tries to do - if we allow dots on write the reader should also work.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Dots are allowed in all names. This is possible because we look up columns using indexByName, which has the nested representation of all field names, like "location.lat".
To build the lookup index, field names in the schema are converted once to the fully-nested name. For example, { "location": {"lat": ..., "long": ...}, "id" }
has 3 leaf names: location.lat, location.long, and id. Note that "location.lat" could be a top-level field with a dot in the name. That's allowed as long as there isn't a conflicting nested field. If there were a conflict, then "location.lat" would refer to two fields and building the index fails.
Using this index, we can always get the original field ID without ever needing to parse a field name using dot and traverse levels.
@@ -81,13 +81,14 @@ public Schema record(Schema record, List<String> names, Iterable<Schema.Field> s | |||
List<Types.NestedField> expectedFields = struct.fields(); | |||
for (int i = 0; i < expectedFields.size(); i += 1) { | |||
Types.NestedField field = expectedFields.get(i); | |||
String sanitizedFieldName = AvroSchemaUtil.sanitize(field.name()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When converting an Iceberg schema to an Avro schema if ever a field is sanitized, the original field name is stored as a field property under AvroSchemaUtil.ICEBERG_FIELD_NAME_PROP
. Should we use that original name if it is set, instead of sanitizing all Iceberg fields before comparison?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried your suggestion by getting the original name of the field
by using field.getProp(AvroSchemaUtil.ICEBERG_FIELD_NAME_PROP)
— the expectedField
doesn't have it. Method getProp
return null if the field has not been sanitized as the ICEBERG_FIELD_NAME_PROP
is not set. This leads to more checks in all conditions bellow.
First, I guess we will still have to check if the name
s are different for the cases where no sanitization is applied.
Second, we will have to check if the ICEBERG_FIELD_NAME_PROP
is null itself.
Now, if the AvroSchemaUtil.sanitize
is a pure function and the sanitization cases are correctly defined and desired to be applied everywhere a field is !validAvroName
, there should not be any issues using it in other parts over original fields — the output should be the same.
What do you think? Should I change this even though the code will be less readable due to checks?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Two minor comments below.
spark/src/main/java/org/apache/iceberg/spark/source/Reader.java
Outdated
Show resolved
Hide resolved
spark/src/main/java/org/apache/iceberg/spark/source/Reader.java
Outdated
Show resolved
Hide resolved
607b7cb
to
21bbc00
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As per spec, this PR allows a column nested in a Struct
to be part of a PartitionSpec
.
LGTM.
I don't think that this fix is correct. It looks like this allows creating a projection schema with a Say I have records like this: I think the solution is to update the Spark code to use constant readers, like we recently added to Pig. The current Spark implementation creates a join row of partition data that must be flat, and then combines it with a row that is materialized from the data file. Instead, we need to materialize the entire record, but instead of reading the partition values, we want to set them to constants instead. That's what the ConstantReader in Pig does. Instead of performing a read from the file to get the data, it just returns the known partition value. This works to fix the current problem because the constant reader is at the right place in the value reader tree, so it looks like this:
|
Here's where the constant readers were added to Pig: d158818 That approach should work for Spark as well. |
@@ -396,7 +397,15 @@ public void close() throws IOException { | |||
// schema or rows returned by readers | |||
Schema finalSchema = expectedSchema; | |||
PartitionSpec spec = task.spec(); | |||
Set<Integer> idColumns = spec.identitySourceIds(); | |||
|
|||
Set<Integer> idColumns = Sets.newHashSet(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you check if this adds latency for very large schemas? Since this is order of magnitude of the size of schema.
Set<Integer> idColumns = spec.identitySourceIds(); | ||
|
||
Set<Integer> idColumns = Sets.newHashSet(); | ||
for (Integer i : spec.identitySourceIds()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you add a comment on why this check is needed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With this we check the nested partitions and we removed them from the right join side (partitions side). We do this because the the left join already has the nested values inside because spark doesn't remove the nested fields even though you specify partitionBy("nested.field")
.
So, in the case of doing partitionBy("nested.field")
on a simple spark data frame and writing it as parquet, the result is in fact that the file paths contains the partition .../nested.field=<value>/...
while the nested field is still present inside the data itself (inside the .parquet
file).
In short, both left join side and right side contains that data so we remove it from the right side and let it in the left side (where is already correctly nested) by not filtering it.
@rdblue This explanation may throw some light on the implementation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I see. Sounds like you're decoding the data written in data files, which keeps the existing structure. Is that right?
I'd still prefer to use the ConstantReader
approach. Iceberg writes partition values to data files, but files imported from Hive tables -- which we do support -- won't have those values and those columns will be missing from the final data.
Another big benefit to that approach is that we can get rid of the unsafe projection that puts the columns back in the expected order. Being able to do that would be a performance win.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rdblue I'll try use a unit test over Hive to see the issue you are describing.
On the other hand I tried joining nested fields using the JoinedRow
construct but it didn't suffice.
Given the following original row:
{
"id": 1,
"otherField": "otherVal",
"nestedData": {
"nestedKey1": "val1",
"nestedKey2": "val2"
}
}
And writing it as Iceberg with the following 2 partitions: identity(id)
and identity(nestedData.nestedKey1)
.
Reading it back with Iceberg Reader would mean creating the row as a join between the partition values (ie: 1
and val1
) and data (ie: otherVal
and val2
).
So, joining the following partition values
{
"id": 1,
"nestedData": {
"nestedKey1": "val1"
}
}
and data
{
"otherField": "otherVal",
"nestedData": {
"nestedKey2": "val2"
}
}
would end as
{
"id": 1,
"nestedData": {
"nestedKey1": "val1"
},
"otherField": "otherVal",
"nestedData": {
"nestedKey2": "val2"
}
}
The above is from what I've seen while putting together this fix.
But we would like it like this:
{
"id": 1,
"otherField": "otherVal",
"nestedData": {
"nestedKey1": "val1",
"nestedKey2": "val2"
}
}
Right?
I don't know how the ConstantReader
would help composing back that nestedData
field as it originally was.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Iceberg writes partition values to data files, but files imported from Hive tables -- which we do support -- won't have those values and those columns will be missing from the final data
@rdblue can you point me to any unit test or snippet of code showing that flow -- loading Hive table data? I'm asking so I can have a starting point as my experience with Hive is not at the same level with Parquet. Thank you.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for the late reply!
Partitioned tables converted using the importSparkTable
won't have partition columns embedded in the file.
But that's not the only reason to use the ConstantReader
approach. As I mentioned, using a ConstantReader
will allow us to produce the schema that is requested by Spark without using a JoinedRow
and a projection. The projection is expensive, so fixing this using a ConstantReader
and removing the projection is a much better solution.
@rdblue If you look at this line (https://github.com/apache/incubator-iceberg/pull/585/files#diff-e6baf84fa0dea08f3457433c123f784fR393) in the test added you'll see that the partition is not flattened as you expected. Given your example the output would be: |
a53db54
to
8dc35a7
Compare
|
||
// detect reordering | ||
if (i < fields.size() && !field.name().equals(fields.get(i).name())) { | ||
if (i < fields.size() && !sanitizedFieldName.equals(fields.get(i).name())) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is sanitization necessary if this fix works by removing columns from the partition columns that get attached via JoinedRow
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is happening before getting to the materialized data. Matching the columns at read time did totally lack the sanitization logic that is used at write time.
@andrei-ionescu, let me know if you need any assistance with this. I think it will be great to get this fixed and remove the hacky unsafe projections in Spark! |
@rdblue I'll try to change the code so that it won't need the unsafe projection anymore. I'll try have that as a separate commit. If I get stuck I'll get back to you. Thanks for support. |
8dc35a7
to
9bb71dd
Compare
@rdblue I added a new commit with the implementation without the unsafe projection. Can you check it please? |
@@ -19,11 +19,13 @@ | |||
|
|||
package org.apache.iceberg.spark.source; | |||
|
|||
import com.google.common.base.Function; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We use java.util.function
classes instead of the Guava ones.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
9bb71dd
to
c6f087f
Compare
@rdblue I modified the code and used a lambda function instead of the Guava version. I also rebased it and resolved conflicts. Please have another look when you have some spare time. Thank you. |
if (ref > internalRow.numFields()) { | ||
internalRow.setNullAt(ref); | ||
} | ||
internalRow.update(ref, ((GenericInternalRow) partition).genericGet(index)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like what you're trying to do is match up the partition fields with fields in the schema, using the field index for data coming from partition and the source ID to recover an index into the output row.
Unfortunately, there are several problems with this approach. The biggest one is that the source ID can't be used to produce an index. This may appear to work when all source fields are projected, but if you don't project a field, then that index is off by 1. It also assumes that IDs are assigned a certain way and there is not guarantee for that in the format. Reordering fields also causes problems with this approach.
The problems can be fixed by using the design I recommended earlier: this should build a map of source ID to partition value. Then it should pass that map in when building the reader tree, so that the tree can use constant readers instead of primitive readers. Here's how it happens in Pig: https://github.com/apache/incubator-iceberg/blob/master/pig/src/main/java/org/apache/iceberg/pig/PigParquetReader.java#L156-L160
c6f087f
to
6a92340
Compare
@rdblue I think I understood what you wanted. Please check and let me know if it's going in the right direction. |
@andrei-ionescu, thanks for the update! I'll review it in the next couple days. |
6a92340
to
4db8832
Compare
The current direction of the PR looks like the right one to me. Thanks! |
@@ -116,7 +117,7 @@ public void tearDownBenchmark() { | |||
public void readUsingIcebergReader(Blackhole blackHole) throws IOException { | |||
try (CloseableIterable<InternalRow> rows = Parquet.read(Files.localInput(dataFile)) | |||
.project(SCHEMA) | |||
.createReaderFunc(type -> SparkParquetReaders.buildReader(SCHEMA, type)) | |||
.createReaderFunc(type -> SparkParquetReaders.buildReader(SCHEMA, type, Collections.emptyMap())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add a buildReader
method that defaults the map to empty? Then we wouldn't need to change this file at all.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
Schema partitionSchema = TypeUtil.select(requiredSchema, idColumns); | ||
PartitionRowConverter convertToRow = new PartitionRowConverter(partitionSchema, spec); | ||
JoinedRow joined = new JoinedRow(); | ||
GenericInternalRow partition = (GenericInternalRow) convertToRow.apply(file.partition()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd rather have this go directly from file.partition()
to the partition map. The problem with using the old partition row is that this has to account for the row's schema as well as the partition schema. It would be easier to avoid this and use a conversion function for values instead of relying on PartitionRowConverter
to do it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
@rdblue, I merged your PR, and added Let me know how I can help. |
e0c0d16
to
e43ceae
Compare
@andrei-ionescu, to get this in, we need the updated logic to be applied only to Parquet. Avro and ORC should still use the joined row approach because they have not been updated. Can you make that change? |
@rdblue, are there any tests for these cases - orc, avro? Or should I add them? If so, can you point me to a starting point for those tests? |
I think we need new tests. I'd start with a test case for each format that writes a few simple records with identity-partitioned columns. The tests should be able to add a partition column that isn't in the file and should validate that data from the file partition tuple overrides what's in the file (if it is a different value). |
6583afa
to
1b3e39f
Compare
I added a nested partition test for Avro similar to the one for Parquet. Let me know if it's enough or not. For Orc format, the writer doesn't support it as seen in Writer.java#L289-L308. Do you want me to try addressing this missing Orc writer feature in a separate PR? |
@rdblue Please don't forget about this PR. Thank you. |
@andrei-ionescu, this hasn't been updated to handle Avro and Parquet differently. This changes Parquet to use partition values from each DataFile, but it does not update Avro (or ORC). That's why Avro needs to use the same logic as before, with a joined row. This should also test that reading a data file without the partition columns works for both Avro and Parquet. |
I don’t fully understand what you mean and what you’re asking. I added a test that uses Avro as the underlying store format and it works very well without any changes. What’s the case/test that require the change back to joining rows? I’ll add the test for Avro without any partitions as you suggest. I guess I’m missing something that you are aware of and that is why I need more information. |
@andrei-ionescu, this PR changes the way partition values are added to result records for Parquet. It does not change Avro or ORC. That means that the changes in Reader that remove the |
I did not follow this thread but wanted to mention this Spark PR as it might be relevant. |
@aokolnychyi, it isn't clear if that Spark PR is attempting to disable this for just sources that don't support it, or for all sources. I don't think there is a reason to disallow it for all sources. The problem here is just record materialization, which we can fix. The problem on that PR is that some sources don't work and should have checks to reject partitioning by nested columns. There's also a note about HiveQL syntax, but explicitly using partitions in a query is not recommended for Iceberg -- users should instead overwrite by an expression in terms of the data columns. |
@andrei-ionescu, I just updated the Avro read path with the same change that you're making here in #896. The only outstanding task on this PR was to avoid breaking the other formats (Avro and ORC) and that is done by my PR here: https://github.com/apache/incubator-iceberg/pull/896/files#diff-b9074b510418ffff8aebbaaa95685541R104. After that goes into master, could you update this PR by adding Parquet to the |
I'm closing this one in because I've made the Parquet changes in #909 and included @andrei-ionescu as a co-author. |
This is a follow-up to apache#896, which added the same constant map support for Avro. Fixes apache#575 for Parquet and replaces apache#585. Andrei did a lot of the work for this in apache#585. Co-authored-by: Andrei Ionescu <webdev.andrei@gmail.com>
* Sync with OSS PRs * fix typo
This is a fix for #575 — After writing Iceberg dataset with nested partitions it cannot be read anymore.