Skip to content

Fix Iceberg Reader for nested partitions (#575) #585

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Conversation

andrei-ionescu
Copy link
Contributor

This is a fix for #575After writing Iceberg dataset with nested partitions it cannot be read anymore.

@andrei-ionescu
Copy link
Contributor Author

HadoopTables tables = new HadoopTables(new Configuration());
PartitionSpec spec = PartitionSpec.builderFor(nestedSchema)
.identity("id")
.identity("nestedData.moreData")
Copy link
Contributor

@rdsr rdsr Oct 28, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering whether we should also fix for partition field names and not allow dots in them. Partition spec schema is used in DataFile schema which is used by the ManifestReader .
Iceberg schemas support indexByName api which maps a source column name to its field id. The column name is canonical, starting from root and containing all parent field names separated by dot.

@rdblue do you think this could somehow break the Schema indexbyName api?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This dot issue in column names is fixed on the writer side by:

I would say that to be consistent we should unify this on both writer and reader - that is what this PR tries to do - if we allow dots on write the reader should also work.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dots are allowed in all names. This is possible because we look up columns using indexByName, which has the nested representation of all field names, like "location.lat".

To build the lookup index, field names in the schema are converted once to the fully-nested name. For example, { "location": {"lat": ..., "long": ...}, "id" } has 3 leaf names: location.lat, location.long, and id. Note that "location.lat" could be a top-level field with a dot in the name. That's allowed as long as there isn't a conflicting nested field. If there were a conflict, then "location.lat" would refer to two fields and building the index fails.

Using this index, we can always get the original field ID without ever needing to parse a field name using dot and traverse levels.

@@ -81,13 +81,14 @@ public Schema record(Schema record, List<String> names, Iterable<Schema.Field> s
List<Types.NestedField> expectedFields = struct.fields();
for (int i = 0; i < expectedFields.size(); i += 1) {
Types.NestedField field = expectedFields.get(i);
String sanitizedFieldName = AvroSchemaUtil.sanitize(field.name());
Copy link
Contributor

@rdsr rdsr Oct 28, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When converting an Iceberg schema to an Avro schema if ever a field is sanitized, the original field name is stored as a field property under AvroSchemaUtil.ICEBERG_FIELD_NAME_PROP . Should we use that original name if it is set, instead of sanitizing all Iceberg fields before comparison?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried your suggestion by getting the original name of the field by using field.getProp(AvroSchemaUtil.ICEBERG_FIELD_NAME_PROP) — the expectedField doesn't have it. Method getProp return null if the field has not been sanitized as the ICEBERG_FIELD_NAME_PROP is not set. This leads to more checks in all conditions bellow.

First, I guess we will still have to check if the names are different for the cases where no sanitization is applied.
Second, we will have to check if the ICEBERG_FIELD_NAME_PROP is null itself.

Now, if the AvroSchemaUtil.sanitize is a pure function and the sanitization cases are correctly defined and desired to be applied everywhere a field is !validAvroName, there should not be any issues using it in other parts over original fields — the output should be the same.

What do you think? Should I change this even though the code will be less readable due to checks?

Copy link
Contributor

@xabriel xabriel left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two minor comments below.

@andrei-ionescu andrei-ionescu force-pushed the ai.fix_reader_nested_partitions branch 2 times, most recently from 607b7cb to 21bbc00 Compare October 30, 2019 08:23
Copy link
Contributor

@xabriel xabriel left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As per spec, this PR allows a column nested in a Struct to be part of a PartitionSpec.

LGTM.

@rdblue
Copy link
Contributor

rdblue commented Oct 30, 2019

I don't think that this fix is correct. It looks like this allows creating a projection schema with a . in the name, but that isn't actually returning the data as it would be.

Say I have records like this: {"id": 1, "info": {"type": "x", ...} } and I want to store those records by identity(info.type) and bucket(id, 16). When I read the data, I should get the original record back, with "type" nested within "info". But this approach instead would return {"id": 1, "info.type": "x", "type": ...}. Right?

I think the solution is to update the Spark code to use constant readers, like we recently added to Pig.

The current Spark implementation creates a join row of partition data that must be flat, and then combines it with a row that is materialized from the data file. Instead, we need to materialize the entire record, but instead of reading the partition values, we want to set them to constants instead. That's what the ConstantReader in Pig does. Instead of performing a read from the file to get the data, it just returns the known partition value. This works to fix the current problem because the constant reader is at the right place in the value reader tree, so it looks like this:

StructReader(
  LongReader(pos=0, name="id", decoder=...),
  StructReader(name="info",
    ConstantReader(pos=0, name="type", value="x"),
    StringReader(pos=1, decoder=...),
    ...
  )
);

@rdblue
Copy link
Contributor

rdblue commented Oct 30, 2019

Here's where the constant readers were added to Pig: d158818

That approach should work for Spark as well.

@@ -396,7 +397,15 @@ public void close() throws IOException {
// schema or rows returned by readers
Schema finalSchema = expectedSchema;
PartitionSpec spec = task.spec();
Set<Integer> idColumns = spec.identitySourceIds();

Set<Integer> idColumns = Sets.newHashSet();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you check if this adds latency for very large schemas? Since this is order of magnitude of the size of schema.

Set<Integer> idColumns = spec.identitySourceIds();

Set<Integer> idColumns = Sets.newHashSet();
for (Integer i : spec.identitySourceIds()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add a comment on why this check is needed?

Copy link
Contributor Author

@andrei-ionescu andrei-ionescu Oct 31, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With this we check the nested partitions and we removed them from the right join side (partitions side). We do this because the the left join already has the nested values inside because spark doesn't remove the nested fields even though you specify partitionBy("nested.field").

So, in the case of doing partitionBy("nested.field") on a simple spark data frame and writing it as parquet, the result is in fact that the file paths contains the partition .../nested.field=<value>/... while the nested field is still present inside the data itself (inside the .parquet file).

In short, both left join side and right side contains that data so we remove it from the right side and let it in the left side (where is already correctly nested) by not filtering it.

@rdblue This explanation may throw some light on the implementation.

Copy link
Contributor

@rdblue rdblue Oct 31, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I see. Sounds like you're decoding the data written in data files, which keeps the existing structure. Is that right?

I'd still prefer to use the ConstantReader approach. Iceberg writes partition values to data files, but files imported from Hive tables -- which we do support -- won't have those values and those columns will be missing from the final data.

Another big benefit to that approach is that we can get rid of the unsafe projection that puts the columns back in the expected order. Being able to do that would be a performance win.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rdblue I'll try use a unit test over Hive to see the issue you are describing.

On the other hand I tried joining nested fields using the JoinedRow construct but it didn't suffice.

Given the following original row:

{
  "id": 1,
  "otherField": "otherVal", 
  "nestedData": { 
    "nestedKey1": "val1",  
    "nestedKey2": "val2" 
  } 
}

And writing it as Iceberg with the following 2 partitions: identity(id) and identity(nestedData.nestedKey1).

Reading it back with Iceberg Reader would mean creating the row as a join between the partition values (ie: 1 and val1) and data (ie: otherVal and val2).

So, joining the following partition values

{ 
  "id": 1, 
  "nestedData": {
    "nestedKey1": "val1" 
  } 
}

and data

{ 
  "otherField": "otherVal", 
  "nestedData": { 
    "nestedKey2": "val2" 
  }
}

would end as

{ 
  "id": 1, 
  "nestedData": { 
    "nestedKey1": "val1" 
  },  
  "otherField": "otherVal", 
  "nestedData": { 
    "nestedKey2": "val2" 
  } 
}

The above is from what I've seen while putting together this fix.

But we would like it like this:

{ 
  "id": 1, 
  "otherField": "otherVal", 
  "nestedData": { 
    "nestedKey1": "val1",  
    "nestedKey2": "val2" 
  } 
}

Right?

I don't know how the ConstantReader would help composing back that nestedData field as it originally was.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Iceberg writes partition values to data files, but files imported from Hive tables -- which we do support -- won't have those values and those columns will be missing from the final data

@rdblue can you point me to any unit test or snippet of code showing that flow -- loading Hive table data? I'm asking so I can have a starting point as my experience with Hive is not at the same level with Parquet. Thank you.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the late reply!

Partitioned tables converted using the importSparkTable won't have partition columns embedded in the file.

But that's not the only reason to use the ConstantReader approach. As I mentioned, using a ConstantReader will allow us to produce the schema that is requested by Spark without using a JoinedRow and a projection. The projection is expensive, so fixing this using a ConstantReader and removing the projection is a much better solution.

@andrei-ionescu
Copy link
Contributor Author

andrei-ionescu commented Oct 31, 2019

@rdblue If you look at this line (https://github.com/apache/incubator-iceberg/pull/585/files#diff-e6baf84fa0dea08f3457433c123f784fR393) in the test added you'll see that the partition is not flattened as you expected.

Given your example the output would be: {"id": 1, "info": {"type": "x"} }. The test checks exactly this - it make sure that we keep the nesting as it is.

@andrei-ionescu andrei-ionescu force-pushed the ai.fix_reader_nested_partitions branch 2 times, most recently from a53db54 to 8dc35a7 Compare October 31, 2019 11:01

// detect reordering
if (i < fields.size() && !field.name().equals(fields.get(i).name())) {
if (i < fields.size() && !sanitizedFieldName.equals(fields.get(i).name())) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is sanitization necessary if this fix works by removing columns from the partition columns that get attached via JoinedRow?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is happening before getting to the materialized data. Matching the columns at read time did totally lack the sanitization logic that is used at write time.

@rdblue
Copy link
Contributor

rdblue commented Nov 11, 2019

@andrei-ionescu, let me know if you need any assistance with this. I think it will be great to get this fixed and remove the hacky unsafe projections in Spark!

@andrei-ionescu
Copy link
Contributor Author

@rdblue I'll try to change the code so that it won't need the unsafe projection anymore. I'll try have that as a separate commit. If I get stuck I'll get back to you. Thanks for support.

@andrei-ionescu andrei-ionescu force-pushed the ai.fix_reader_nested_partitions branch from 8dc35a7 to 9bb71dd Compare November 13, 2019 14:21
@andrei-ionescu
Copy link
Contributor Author

@rdblue I added a new commit with the implementation without the unsafe projection. Can you check it please?

@@ -19,11 +19,13 @@

package org.apache.iceberg.spark.source;

import com.google.common.base.Function;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We use java.util.function classes instead of the Guava ones.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@andrei-ionescu andrei-ionescu force-pushed the ai.fix_reader_nested_partitions branch from 9bb71dd to c6f087f Compare November 14, 2019 09:09
@andrei-ionescu
Copy link
Contributor Author

@rdblue I modified the code and used a lambda function instead of the Guava version. I also rebased it and resolved conflicts. Please have another look when you have some spare time. Thank you.

if (ref > internalRow.numFields()) {
internalRow.setNullAt(ref);
}
internalRow.update(ref, ((GenericInternalRow) partition).genericGet(index));
Copy link
Contributor

@rdblue rdblue Nov 16, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like what you're trying to do is match up the partition fields with fields in the schema, using the field index for data coming from partition and the source ID to recover an index into the output row.

Unfortunately, there are several problems with this approach. The biggest one is that the source ID can't be used to produce an index. This may appear to work when all source fields are projected, but if you don't project a field, then that index is off by 1. It also assumes that IDs are assigned a certain way and there is not guarantee for that in the format. Reordering fields also causes problems with this approach.

The problems can be fixed by using the design I recommended earlier: this should build a map of source ID to partition value. Then it should pass that map in when building the reader tree, so that the tree can use constant readers instead of primitive readers. Here's how it happens in Pig: https://github.com/apache/incubator-iceberg/blob/master/pig/src/main/java/org/apache/iceberg/pig/PigParquetReader.java#L156-L160

@andrei-ionescu andrei-ionescu force-pushed the ai.fix_reader_nested_partitions branch from c6f087f to 6a92340 Compare November 20, 2019 19:54
@andrei-ionescu
Copy link
Contributor Author

@rdblue I think I understood what you wanted. Please check and let me know if it's going in the right direction.

@rdblue
Copy link
Contributor

rdblue commented Nov 20, 2019

@andrei-ionescu, thanks for the update! I'll review it in the next couple days.

@andrei-ionescu andrei-ionescu force-pushed the ai.fix_reader_nested_partitions branch from 6a92340 to 4db8832 Compare November 21, 2019 08:33
@rdblue
Copy link
Contributor

rdblue commented Nov 21, 2019

The current direction of the PR looks like the right one to me. Thanks!

@@ -116,7 +117,7 @@ public void tearDownBenchmark() {
public void readUsingIcebergReader(Blackhole blackHole) throws IOException {
try (CloseableIterable<InternalRow> rows = Parquet.read(Files.localInput(dataFile))
.project(SCHEMA)
.createReaderFunc(type -> SparkParquetReaders.buildReader(SCHEMA, type))
.createReaderFunc(type -> SparkParquetReaders.buildReader(SCHEMA, type, Collections.emptyMap()))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a buildReader method that defaults the map to empty? Then we wouldn't need to change this file at all.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Schema partitionSchema = TypeUtil.select(requiredSchema, idColumns);
PartitionRowConverter convertToRow = new PartitionRowConverter(partitionSchema, spec);
JoinedRow joined = new JoinedRow();
GenericInternalRow partition = (GenericInternalRow) convertToRow.apply(file.partition());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd rather have this go directly from file.partition() to the partition map. The problem with using the old partition row is that this has to account for the row's schema as well as the partition schema. It would be easier to avoid this and use a conversion function for values instead of relying on PartitionRowConverter to do it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@andrei-ionescu
Copy link
Contributor Author

@rdblue, I merged your PR, and added day partition into the nested partitions related unit test.

Let me know how I can help.

@andrei-ionescu andrei-ionescu force-pushed the ai.fix_reader_nested_partitions branch from e0c0d16 to e43ceae Compare December 5, 2019 17:02
@rdblue
Copy link
Contributor

rdblue commented Dec 5, 2019

@andrei-ionescu, to get this in, we need the updated logic to be applied only to Parquet. Avro and ORC should still use the joined row approach because they have not been updated. Can you make that change?

@andrei-ionescu
Copy link
Contributor Author

andrei-ionescu commented Dec 5, 2019

@rdblue, are there any tests for these cases - orc, avro? Or should I add them? If so, can you point me to a starting point for those tests?

@rdblue
Copy link
Contributor

rdblue commented Dec 5, 2019

I think we need new tests.

I'd start with a test case for each format that writes a few simple records with identity-partitioned columns. The tests should be able to add a partition column that isn't in the file and should validate that data from the file partition tuple overrides what's in the file (if it is a different value).

@andrei-ionescu andrei-ionescu force-pushed the ai.fix_reader_nested_partitions branch from 6583afa to 1b3e39f Compare December 6, 2019 11:54
@andrei-ionescu
Copy link
Contributor Author

@rdblue,

I added a nested partition test for Avro similar to the one for Parquet. Let me know if it's enough or not.

For Orc format, the writer doesn't support it as seen in Writer.java#L289-L308. Do you want me to try addressing this missing Orc writer feature in a separate PR?

@andrei-ionescu
Copy link
Contributor Author

@rdblue Please don't forget about this PR. Thank you.

@rdblue
Copy link
Contributor

rdblue commented Dec 9, 2019

@andrei-ionescu, this hasn't been updated to handle Avro and Parquet differently. This changes Parquet to use partition values from each DataFile, but it does not update Avro (or ORC). That's why Avro needs to use the same logic as before, with a joined row. This should also test that reading a data file without the partition columns works for both Avro and Parquet.

@andrei-ionescu
Copy link
Contributor Author

@rdblue,

I don’t fully understand what you mean and what you’re asking. I added a test that uses Avro as the underlying store format and it works very well without any changes. What’s the case/test that require the change back to joining rows?

I’ll add the test for Avro without any partitions as you suggest.

I guess I’m missing something that you are aware of and that is why I need more information.

@rdblue
Copy link
Contributor

rdblue commented Dec 18, 2019

@andrei-ionescu, this PR changes the way partition values are added to result records for Parquet. It does not change Avro or ORC. That means that the changes in Reader that remove the JoinedRow should not be applied to Avro or ORC. This PR needs to make changes just to the Parquet read, not for Avro or ORC.

@aokolnychyi
Copy link
Contributor

I did not follow this thread but wanted to mention this Spark PR as it might be relevant.

@rdblue
Copy link
Contributor

rdblue commented Jan 6, 2020

@aokolnychyi, it isn't clear if that Spark PR is attempting to disable this for just sources that don't support it, or for all sources. I don't think there is a reason to disallow it for all sources. The problem here is just record materialization, which we can fix. The problem on that PR is that some sources don't work and should have checks to reject partitioning by nested columns. There's also a note about HiveQL syntax, but explicitly using partitions in a query is not recommended for Iceberg -- users should instead overwrite by an expression in terms of the data columns.

@rdblue
Copy link
Contributor

rdblue commented Apr 6, 2020

@andrei-ionescu, I just updated the Avro read path with the same change that you're making here in #896. The only outstanding task on this PR was to avoid breaking the other formats (Avro and ORC) and that is done by my PR here: https://github.com/apache/incubator-iceberg/pull/896/files#diff-b9074b510418ffff8aebbaaa95685541R104.

After that goes into master, could you update this PR by adding Parquet to the SUPPORTS_CONSTANTS format set?

@rdblue
Copy link
Contributor

rdblue commented Apr 9, 2020

I'm closing this one in because I've made the Parquet changes in #909 and included @andrei-ionescu as a co-author.

@rdblue rdblue closed this Apr 9, 2020
rdblue added a commit that referenced this pull request Apr 11, 2020
This is a follow-up to #896, which added the same constant map support for Avro.

Fixes #575 for Parquet and replaces #585. Andrei did a lot of the work for this in #585.

Co-authored-by: Andrei Ionescu <webdev.andrei@gmail.com>
Fokko pushed a commit to Fokko/iceberg that referenced this pull request Apr 21, 2020
This is a follow-up to apache#896, which added the same constant map support for Avro.

Fixes apache#575 for Parquet and replaces apache#585. Andrei did a lot of the work for this in apache#585.

Co-authored-by: Andrei Ionescu <webdev.andrei@gmail.com>
sunchao pushed a commit to sunchao/iceberg that referenced this pull request May 9, 2023
* Sync with OSS PRs

* fix typo
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants