-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Core: Add readable metrics columns to files metadata tables #5376
Conversation
c06a868
to
a6e3cbe
Compare
Transforms.identity(field.type()) | ||
.toHumanString(Conversions.fromByteBuffer(field.type(), value))); | ||
} catch (Exception e) { // Ignore | ||
return Optional.empty(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This happens in some cases, I found it in some case of importing external files to Iceberg table, ie TestIcebergSourceHadoopTables.testFilesTableWithSnapshotIdInheritance, where the I think columns are out of order of the original schema and the metrics are corrupt (underflow exception in this case).
Not sure if we should error out the files tables, in that case, I was leaning towards just returning null. User has original column to see why the error happened.
223a3ad
to
936e2ea
Compare
All Spark tests are updated/fixed now. Fyi @RussellSpitzer @aokolnychyi @rdblue if you guys have time to leave some feedback. There are new tests added but its a bit big to show 'Files Changed': TestMetadataTableMetricsColumns.java |
return Optional.of( | ||
Transforms.identity(field.type()) | ||
.toHumanString(Conversions.fromByteBuffer(field.type(), value))); | ||
} catch (Exception e) { // Ignore |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you have examples when this throws an exception?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea I tried a put a comment, but unfortunately got disassociated with this line after a rebase.
This happens in some cases, I found it in some case of importing external files to Iceberg table, ie TestIcebergSourceHadoopTables.testFilesTableWithSnapshotIdInheritance, where the I think columns are out of order of the original schema and the metrics are corrupt (underflow exception in this case).
Not sure if we should error out the files tables, in that case, I was leaning towards just returning null. User has original column to see why the error happened.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@aokolnychyi filed an issue: #5543
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Following up on this, this is a non-issue as the spark procedures set the flag: schema.name-mapping.default , just this test does not. Fixed the test.
It seems like a great idea to add readable metrics. It is hard to make sense of them otherwise. @szehon-ho, what do you think about adding a single map column, let's say called We can then easily access them via SQL.
I am okay with individual columns too but it seems a bit cleaner to just have one. |
d1324e3
to
62eb36c
Compare
Let me check in a bit. |
spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
Outdated
Show resolved
Hide resolved
Let me take a look today. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks really close to me.
Let me take a look now. |
@@ -148,6 +148,16 @@ public static Schema join(Schema left, Schema right) { | |||
return new Schema(joinedColumns); | |||
} | |||
|
|||
public static Schema joinCommon(Schema left, Schema right) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about simply adapting the existing join
method? Are there any scenarios where we want to skip the validation and simply add all columns (old logic)?
return CloseableIterable.transform(files(projection), file -> (StructLike) file); | ||
} else { | ||
Schema fileProjection = TypeUtil.selectNot(projection, READABLE_METRICS_FIELD_IDS); | ||
Schema minProjection = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this logic should be part of BaseFilesTableScan
and BaseAllFilesTableScan
.
Otherwise, our scans won't report the correct schema in Scan$schema()
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think putting it there will break the scan right, as its not the projection the user requested.
Note, this is actually a bit subtle here. Because we are doing the join, (original projection + minimum metrics), the file's schema becomes
{any_projected_field_on_file} : {readable_metrics because its also projected} : {un-projected but required metrics fields}
So the ContentFileWithMetrics works because it will discard any of the "un-projected but required metrics fields", given they are outside the range it will read. For the remaining fields it uses the existing logic (delegate to file for the first n-1, and then get from MetricsStruct for nth field).
I mean, we could add a select method to GenericDataFile to modify its internal 'fromProjectionPos' map to conform back to the original projection (dropping the "un-projected but required metrics fields"). But it would mainly be for clarity, and not strictly needed.
Added additional test, looks it is working even when readable_metric column is selected before other columns (spark somehow calls the rows in their original order) |
@@ -142,9 +142,21 @@ public static Schema selectNot(Schema schema, Set<Integer> fieldIds) { | |||
} | |||
|
|||
public static Schema join(Schema left, Schema right) { | |||
List<Types.NestedField> joinedColumns = Lists.newArrayList(); | |||
joinedColumns.addAll(left.columns()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: This changes the original behavior, why not add a new function?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@chenjunjiedada Yea that was my original version, and changed after comment of @aokolnychyi #5376 (comment)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Technically this is changing a public API which previously would have allowed these combos. This is ... maybe ok since it's a utility method but we may end up breaking users of the function at runtime. That said I think Anton is right and any schema with multiple columns with the same ID would always be wrong.
Really nice PR, thanks @szehon-ho and @aokolnychyi for the effort! When can we merge this? I think it is ready and has been two months since the last review, which will lead to more conflicts if leave it. |
21c7205
to
0e68ae3
Compare
@RussellSpitzer addressed the comments, thanks! |
Actually hold on a second, looking at a small refactor to make it more generic to add a readable_metric definition in future |
…ion in READABLE_METRIC_COLS static array
@RussellSpitzer should be good now for another look when you get a chance, thanks! |
@@ -486,6 +489,11 @@ public void testFilesTableWithSnapshotIdInheritance() throws Exception { | |||
Dataset<Row> inputDF = spark.createDataFrame(records, SimpleRecord.class); | |||
inputDF.select("data", "id").write().mode("overwrite").insertInto("parquet_table"); | |||
|
|||
NameMapping mapping = MappingUtil.create(table.schema()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Test used to write the wrong metrics to imported table, without these lines.
public static Schema readableMetricsSchema(Schema dataTableSchema, Schema metadataTableSchema) { | ||
List<Types.NestedField> fields = Lists.newArrayList(); | ||
Map<Integer, String> idToName = dataTableSchema.idToName(); | ||
AtomicInteger nextId = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't believe this has to be atomic, and metadataTableSchema should already have a method highestFieldId()
which according to the doc includes nested fields
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you prefer incrementAndGet() to ++nextId though I think using the Atomic just for readability is probably fine
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah AtomicInteger is because there is a lambda function in there (the map) and compiler complains :
Variable used in lambda expression should be final or effectively final
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Re: highestFieldId(), good to know! Done.
|
||
Table filesTable = new FilesTable(table.ops(), table); | ||
Types.StructType actual = filesTable.newScan().schema().select("readable_metrics").asStruct(); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just to make this a little easier in the future you may just want to do something like
firstAssigned = (schema.highestId - 15)
Then do
1001 = firstAssigned +1; ....
not sure this really helps that much though
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One remaining nit on the "highestId" call. I think overall we probably should do a refactoring of our tests for the files table in Spark, they have been really brittle to changes for a long time and I think we can do better. I think that can wait though or maybe be a task for a newcomer who wants to understand metadata tables better.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually wasn't there a set of tests checking that the projection was working correctly? I'm not sure I see those tests anymore but maybe I've looked at this for too long?
Would love to see what is a good way to simplify it without breaking the checks. Currently compares every single field. |
Thanks @RussellSpitzer @aokolnychyi @chenjunjiedada for detailed reviews |
) (apache#861) Co-authored-by: Szehon Ho <szehon.apache@gmail.com>
@szehon-ho @RussellSpitzer Is there any document about these readable metrics ? All these metrics are exposed using files metadata only ? |
@szehon-ho Actual column names are without 's' in the end |
Closes #4362
This adds following columns to all files tables:
These are then a map of column_name to value.