Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Core: Add readable metrics columns to files metadata tables #5376

Merged
merged 16 commits into from
Dec 5, 2022

Conversation

szehon-ho
Copy link
Collaborator

@szehon-ho szehon-ho commented Jul 28, 2022

Closes #4362

This adds following columns to all files tables:

  • readable_metrics, which is struct of:
  • column_sizes
  • value_counts
  • null_value_counts
  • nan_value_counts
  • lower_bounds
  • upper_bounds

These are then a map of column_name to value.

Transforms.identity(field.type())
.toHumanString(Conversions.fromByteBuffer(field.type(), value)));
} catch (Exception e) { // Ignore
return Optional.empty();
Copy link
Collaborator Author

@szehon-ho szehon-ho Aug 1, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This happens in some cases, I found it in some case of importing external files to Iceberg table, ie TestIcebergSourceHadoopTables.testFilesTableWithSnapshotIdInheritance, where the I think columns are out of order of the original schema and the metrics are corrupt (underflow exception in this case).

Not sure if we should error out the files tables, in that case, I was leaning towards just returning null. User has original column to see why the error happened.

@szehon-ho szehon-ho force-pushed the readable_metrics branch 2 times, most recently from 223a3ad to 936e2ea Compare August 2, 2022 17:33
@szehon-ho
Copy link
Collaborator Author

All Spark tests are updated/fixed now.

Fyi @RussellSpitzer @aokolnychyi @rdblue if you guys have time to leave some feedback. There are new tests added but its a bit big to show 'Files Changed': TestMetadataTableMetricsColumns.java

return Optional.of(
Transforms.identity(field.type())
.toHumanString(Conversions.fromByteBuffer(field.type(), value)));
} catch (Exception e) { // Ignore
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you have examples when this throws an exception?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea I tried a put a comment, but unfortunately got disassociated with this line after a rebase.

This happens in some cases, I found it in some case of importing external files to Iceberg table, ie TestIcebergSourceHadoopTables.testFilesTableWithSnapshotIdInheritance, where the I think columns are out of order of the original schema and the metrics are corrupt (underflow exception in this case).

Not sure if we should error out the files tables, in that case, I was leaning towards just returning null. User has original column to see why the error happened.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@aokolnychyi filed an issue: #5543

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Following up on this, this is a non-issue as the spark procedures set the flag: schema.name-mapping.default , just this test does not. Fixed the test.

@aokolnychyi
Copy link
Contributor

It seems like a great idea to add readable metrics. It is hard to make sense of them otherwise.

@szehon-ho, what do you think about adding a single map column, let's say called readable_metrics, that will hold a mapping from a column name into a struct that would represent metrics? The type will be Map<String, StructType> and we will have individual struct fields for each type of metric.

We can then easily access them via SQL.

SELECT readable_metrics['col1'].lower_bound FROM db.t.files

I am okay with individual columns too but it seems a bit cleaner to just have one.

@aokolnychyi
Copy link
Contributor

Let me check in a bit.

@github-actions github-actions bot added the API label Aug 19, 2022
@aokolnychyi
Copy link
Contributor

Let me take a look today.

Copy link
Contributor

@aokolnychyi aokolnychyi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks really close to me.

@aokolnychyi
Copy link
Contributor

Let me take a look now.

@@ -148,6 +148,16 @@ public static Schema join(Schema left, Schema right) {
return new Schema(joinedColumns);
}

public static Schema joinCommon(Schema left, Schema right) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about simply adapting the existing join method? Are there any scenarios where we want to skip the validation and simply add all columns (old logic)?

return CloseableIterable.transform(files(projection), file -> (StructLike) file);
} else {
Schema fileProjection = TypeUtil.selectNot(projection, READABLE_METRICS_FIELD_IDS);
Schema minProjection =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this logic should be part of BaseFilesTableScan and BaseAllFilesTableScan.
Otherwise, our scans won't report the correct schema in Scan$schema().

Copy link
Collaborator Author

@szehon-ho szehon-ho Aug 29, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think putting it there will break the scan right, as its not the projection the user requested.

Note, this is actually a bit subtle here. Because we are doing the join, (original projection + minimum metrics), the file's schema becomes
{any_projected_field_on_file} : {readable_metrics because its also projected} : {un-projected but required metrics fields}

So the ContentFileWithMetrics works because it will discard any of the "un-projected but required metrics fields", given they are outside the range it will read. For the remaining fields it uses the existing logic (delegate to file for the first n-1, and then get from MetricsStruct for nth field).

I mean, we could add a select method to GenericDataFile to modify its internal 'fromProjectionPos' map to conform back to the original projection (dropping the "un-projected but required metrics fields"). But it would mainly be for clarity, and not strictly needed.

@szehon-ho
Copy link
Collaborator Author

Added additional test, looks it is working even when readable_metric column is selected before other columns (spark somehow calls the rows in their original order)

@@ -142,9 +142,21 @@ public static Schema selectNot(Schema schema, Set<Integer> fieldIds) {
}

public static Schema join(Schema left, Schema right) {
List<Types.NestedField> joinedColumns = Lists.newArrayList();
joinedColumns.addAll(left.columns());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: This changes the original behavior, why not add a new function?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@chenjunjiedada Yea that was my original version, and changed after comment of @aokolnychyi #5376 (comment)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically this is changing a public API which previously would have allowed these combos. This is ... maybe ok since it's a utility method but we may end up breaking users of the function at runtime. That said I think Anton is right and any schema with multiple columns with the same ID would always be wrong.

@chenjunjiedada
Copy link
Collaborator

Really nice PR, thanks @szehon-ho and @aokolnychyi for the effort! When can we merge this? I think it is ready and has been two months since the last review, which will lead to more conflicts if leave it.

@szehon-ho szehon-ho force-pushed the readable_metrics branch 2 times, most recently from 21c7205 to 0e68ae3 Compare November 3, 2022 23:01
@szehon-ho
Copy link
Collaborator Author

@RussellSpitzer addressed the comments, thanks!

@szehon-ho
Copy link
Collaborator Author

Actually hold on a second, looking at a small refactor to make it more generic to add a readable_metric definition in future

@szehon-ho
Copy link
Collaborator Author

@RussellSpitzer should be good now for another look when you get a chance, thanks!

@@ -486,6 +489,11 @@ public void testFilesTableWithSnapshotIdInheritance() throws Exception {
Dataset<Row> inputDF = spark.createDataFrame(records, SimpleRecord.class);
inputDF.select("data", "id").write().mode("overwrite").insertInto("parquet_table");

NameMapping mapping = MappingUtil.create(table.schema());
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test used to write the wrong metrics to imported table, without these lines.

public static Schema readableMetricsSchema(Schema dataTableSchema, Schema metadataTableSchema) {
List<Types.NestedField> fields = Lists.newArrayList();
Map<Integer, String> idToName = dataTableSchema.idToName();
AtomicInteger nextId =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't believe this has to be atomic, and metadataTableSchema should already have a method highestFieldId() which according to the doc includes nested fields

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you prefer incrementAndGet() to ++nextId though I think using the Atomic just for readability is probably fine

Copy link
Collaborator Author

@szehon-ho szehon-ho Dec 2, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah AtomicInteger is because there is a lambda function in there (the map) and compiler complains :

Variable used in lambda expression should be final or effectively final

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Re: highestFieldId(), good to know! Done.


Table filesTable = new FilesTable(table.ops(), table);
Types.StructType actual = filesTable.newScan().schema().select("readable_metrics").asStruct();

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to make this a little easier in the future you may just want to do something like

firstAssigned = (schema.highestId - 15)
Then do
1001 = firstAssigned +1; ....

not sure this really helps that much though

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Copy link
Member

@RussellSpitzer RussellSpitzer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One remaining nit on the "highestId" call. I think overall we probably should do a refactoring of our tests for the files table in Spark, they have been really brittle to changes for a long time and I think we can do better. I think that can wait though or maybe be a task for a newcomer who wants to understand metadata tables better.

Copy link
Member

@RussellSpitzer RussellSpitzer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually wasn't there a set of tests checking that the projection was working correctly? I'm not sure I see those tests anymore but maybe I've looked at this for too long?

@szehon-ho
Copy link
Collaborator Author

@szehon-ho
Copy link
Collaborator Author

Would love to see what is a good way to simplify it without breaking the checks. Currently compares every single field.

@szehon-ho szehon-ho merged commit 9a00f74 into apache:master Dec 5, 2022
@szehon-ho
Copy link
Collaborator Author

Thanks @RussellSpitzer @aokolnychyi @chenjunjiedada for detailed reviews

sunchao pushed a commit to sunchao/iceberg that referenced this pull request May 10, 2023
) (apache#861)

Co-authored-by: Szehon Ho <szehon.apache@gmail.com>
@atifiu
Copy link

atifiu commented Sep 17, 2023

@szehon-ho @RussellSpitzer Is there any document about these readable metrics ? All these metrics are exposed using files metadata only ?

@atifiu
Copy link

atifiu commented Sep 17, 2023

Closes #4362

This adds following columns to all files tables:

  • readable_metrics, which is struct of:
  • column_sizes
  • value_counts
  • null_value_counts
  • nan_value_counts
  • lower_bounds
  • upper_bounds

These are then a map of column_name to value.

@szehon-ho Actual column names are without 's' in the end

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Expose human-readable metrics in metadata tables
5 participants