Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Core: exclude NaN from upper/lower bound of floating columns in Parquet/ORC #2464

Merged
merged 3 commits into from
Jun 24, 2021

Conversation

yyanyy
Copy link
Contributor

@yyanyy yyanyy commented Apr 12, 2021

/**
* Iceberg internally tracked field level metrics.
*/
public class FieldMetrics {
public class FieldMetrics<T> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change in this class overlaps with what #1963 contains. For now this class is not directly used/created by Parquet/ORC, and will be for Avro it will, thus should be safe to change. Same goes for FloatFieldMetrics.


private void assertBoundValueMatch(Number expected, Map<Integer, ByteBuffer> boundMap, Types.NestedField field) {
if (field.type().isNestedType() && fileFormat == FileFormat.ORC) {
// we don't update floating column bounds values within ORC nested columns
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We actually could make it work for Orc but I'm not sure if it's worth it given metrics in nested types are not recorded/used normally anyway. The reason for the different behavior in Parquet/Orc is the following:

Parquet tracks column stats per block, and when computing metrics we need to update min/max when iterating through all blocks; to avoid doing so for floating point columns that we track min/max ourselves now, we update min/max for these columns after all blocks have been processed.

However Orc tracks stats per column so we could directly inject the new bound value when we evaluates every column stats. And since Orc doesn't track nested column stats, we never look at columns within nested fields.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's okay to leave this as-is. I actually don't think that we really need these stats for Parquet either, but it's probably better to have them.

@@ -85,6 +89,7 @@ public static Metrics footerMetrics(ParquetMetadata metadata, Stream<FieldMetric
return footerMetrics(metadata, fieldMetrics, metricsConfig, null);
}

@SuppressWarnings("checkstyle:CyclomaticComplexity")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Decided to not refactor this method (at least not in this PR) since in this method we are essentially updating all column stats, and by moving logic directly out of the method we need to pass in those 6 stats map with some extra info, which could easily make the helper method exceeding 10 arguments.

@yyanyy yyanyy changed the title [Core] exclude NaN from upper/lower bound of floating columns in Parquet/ORC Core: exclude NaN from upper/lower bound of floating columns in Parquet/ORC Apr 12, 2021
@rdblue rdblue added this to the Java 0.12.0 Release milestone Jun 15, 2021
@@ -37,6 +48,7 @@

import static org.apache.iceberg.types.Types.NestedField.optional;
import static org.apache.iceberg.types.Types.NestedField.required;
import static org.junit.Assert.assertEquals;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: the community prefers to avoid static method imports. There are places (usually my fault) that still import optional and required like the lines above, but we tend to discourage doing this in new pull requests. Could you remove this and just import Assert?

@@ -125,7 +134,9 @@ public static Metrics footerMetrics(ParquetMetadata metadata, Stream<FieldMetric
} else if (!stats.isEmpty()) {
increment(nullValueCounts, fieldId, stats.getNumNulls());

if (metricsMode != MetricsModes.Counts.get()) {
// since Parquet includes NaN for upper/lower bounds of floating point column that we don't want,
// we have tracked metrics for such columns ourselves and thus do not need to rely on Parquet's stats.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You might want to rephrase this not to assume the reason why fieldMetricsMap contains metrics. If there are metrics gathered by Iceberg for a column, then we should use those instead of the ones from Parquet. The NaN handling for float and double columns is one case of that. But there may be others in the future.

lowerBounds.put(fieldId, Literal.of((Double) metrics.lowerBound()));
upperBounds.put(fieldId, Literal.of((Double) metrics.upperBound()));
} else {
throw new IllegalStateException("[BUG] Only Float/Double column metrics should be collected by Iceberg");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: I would probably rephrase this as "Expected only float or double column metrics" and throw an UnsupportedOperationException. I think that is (marginally) more clear because it isn't quite right to say that only float metrics should be collected. The code assumed that, but there isn't a reason for that to be the case other than we didn't implement the other types.

toBufferMap(fileSchema, upperBounds));
}

private static void updateFloatingColumnsBounds(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: You might consider renaming this so that it doesn't assume that Iceberg is maintaining bounds only for floating point columns. Maybe updateFromFieldMetrics?

@@ -99,6 +104,10 @@ public static Metrics footerMetrics(ParquetMetadata metadata, Stream<FieldMetric
MessageType parquetTypeWithIds = getParquetTypeWithIds(metadata, nameMapping);
Schema fileSchema = ParquetSchemaUtil.convertAndPrune(parquetTypeWithIds);

Map<Integer, FieldMetrics> fieldMetricsMap = Optional.ofNullable(fieldMetrics)
.map(stream -> stream.collect(Collectors.toMap(FieldMetrics::id, Function.identity())))
.orElseGet(HashMap::new);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not .orElse(ImmutableMap.of())? Seems strange to create a new map if field metrics are missing. And won't stream.collect(Collectors.toMap(...)) return an empty map if the stream is empty? Does this need to handle a null stream or can it just assume that it will be non-null?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was trying to be defensive in handling null stream here which is why I had this Optional wrapper and orElse, but you are right the current implementation wouldn't produce a null stream. I'll add a precondition check for null stream and then directly use the stream to create the map.

public FloatFieldMetrics(int id,
long nanValueCount) {
super(id, 0L, 0L, nanValueCount, null, null);
public class FloatFieldMetrics extends FieldMetrics<Number> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is a public class, I think it would make more sense to split it into an implementation for float and an implementation for double and have each implementation correctly set the type that is tracked. That avoids needing to test the type of upper bound or lower bound because you know that both are going to be the same. That's an assumption that the code below makes, but it isn't necessarily the case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good! I was hesitant to do that since I think there are too many duplicated code, I guess I was trying too hard to eliminate duplications...

@Override
public ByteBuffer upperBound() {
throw new IllegalStateException("Shouldn't access this method, as this metric is tracked in file statistics. ");
public static class DoubleFieldMetricsContext extends AbstractFloatFieldMetricsContext<Double> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class is suspiciously similar to the builder pattern, so I would probably just convert it to be a builder for FieldMetrics instead:

  • Rename DoubleFieldMetricsContext to DoubleFieldMetrics.Builder and create it with builderFor(int id)
  • Rename updateMetricsContext to something shorter, like addValue
  • Rename buildMetrics to just build


@Override
public long valueCount() {
throw new IllegalStateException("Shouldn't access this method, as this metric is tracked in file statistics. ");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We typically prefer UnsupportedOperationException for cases like this rather than IllegalStateException, which is described in javadoc as an exception that "Signals that a method has been invoked at an illegal or inappropriate time". While you could argue that any time is an "inappropriate time", I think unsupported describes the situation better.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking about this a bit more, would it be a bad thing to collect the stats so that future uses don't need to go back and modify this class? Seems like there isn't much benefit to skipping the implementation for just these two methods.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We had to spend some extra effort on nullable writers to make them update null count and value count in order to get the right results for the unimplemented methods, but that's not complicated and will update.

if (Double.isNaN(value)) {
this.nanValueCount++;
} else {
if (lowerBound == null || Double.compare(value, lowerBound) < 0) {
Copy link
Contributor

@rdblue rdblue Jun 21, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mentioned this on the Avro PR, but I think it would be better to make lowerBound and upperBound primitives and get rid of the null checks here. Then you can use valueCount - nullValueCount > 0 to determine whether to return the bounds or null.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry haven't checked the avro PR yet so didn't see that comment... writers creating this already has null values filtered out so null value count is always 0, but the idea could still work. Will update!

@@ -99,6 +105,9 @@ public static Metrics footerMetrics(ParquetMetadata metadata, Stream<FieldMetric
MessageType parquetTypeWithIds = getParquetTypeWithIds(metadata, nameMapping);
Schema fileSchema = ParquetSchemaUtil.convertAndPrune(parquetTypeWithIds);

Map<Integer, FieldMetrics> fieldMetricsMap = fieldMetrics.collect(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FieldMetrics is parameterized, right? I think this should be FieldMetrics<?>.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will modify the ones for Parquet and ORC, and leave the Avro related ones as is, since we are going to modify them in the Avro metrics support PR anyway. Let me know if it works!

// upper and lower bounds will both null or neither
lowerBounds.remove(fieldId);
upperBounds.remove(fieldId);
} else if (metrics.upperBound() instanceof Float) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should be metrics instanceof FloatFieldMetrics instead of checking the bound.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually due to having to modify the option writer to return correct value count/null value count, this can go different directions:

  • we either let the option writer to explicitly check for float/double writer and create the corresponding field metric object type (also have to make float/double field metrics' constructor public)
  • or we create the generic field metric type in option writer, and continue to check instanceof on bounds here

For now I decided latter for better flexibility, but don't have strong opinion either way. Please let me know your thoughts!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I see. Good point. Let's go with this for now. In the future, we could keep the value class in the metrics, so you could do something like this:

  if (metrics.javaType() instanceof Float) { ... }

But that's probably not worth it at this point.


// only check for MetricsModes.None, since we don't truncate float/double values.
if (metricsMode != MetricsModes.None.get()) {
if (metrics.upperBound() == null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can metrics expose a better way to detect this than null? What about hasBounds()?

@github-actions github-actions bot added the flink label Jun 24, 2021

@Override
public void nullWrite() {
nullValueCount++;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like it may be a good idea to move null tracking into the builder, so you could just run metricsBuilder.addValue(null) or metricsBuilder.addNullValue(). Then you wouldn't need to rebuild the metrics later.

@rdblue rdblue merged commit 2e46847 into apache:master Jun 24, 2021
@rdblue
Copy link
Contributor

rdblue commented Jun 24, 2021

I made a few comments, but I don't think there are any issues with this that need to be fixed so I merged it. Thanks @yyanyy!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants