-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Core: exclude NaN from upper/lower bound of floating columns in Parquet/ORC #2464
Conversation
/** | ||
* Iceberg internally tracked field level metrics. | ||
*/ | ||
public class FieldMetrics { | ||
public class FieldMetrics<T> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Change in this class overlaps with what #1963 contains. For now this class is not directly used/created by Parquet/ORC, and will be for Avro it will, thus should be safe to change. Same goes for FloatFieldMetrics
.
|
||
private void assertBoundValueMatch(Number expected, Map<Integer, ByteBuffer> boundMap, Types.NestedField field) { | ||
if (field.type().isNestedType() && fileFormat == FileFormat.ORC) { | ||
// we don't update floating column bounds values within ORC nested columns |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We actually could make it work for Orc but I'm not sure if it's worth it given metrics in nested types are not recorded/used normally anyway. The reason for the different behavior in Parquet/Orc is the following:
Parquet tracks column stats per block, and when computing metrics we need to update min/max when iterating through all blocks; to avoid doing so for floating point columns that we track min/max ourselves now, we update min/max for these columns after all blocks have been processed.
However Orc tracks stats per column so we could directly inject the new bound value when we evaluates every column stats. And since Orc doesn't track nested column stats, we never look at columns within nested fields.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's okay to leave this as-is. I actually don't think that we really need these stats for Parquet either, but it's probably better to have them.
@@ -85,6 +89,7 @@ public static Metrics footerMetrics(ParquetMetadata metadata, Stream<FieldMetric | |||
return footerMetrics(metadata, fieldMetrics, metricsConfig, null); | |||
} | |||
|
|||
@SuppressWarnings("checkstyle:CyclomaticComplexity") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Decided to not refactor this method (at least not in this PR) since in this method we are essentially updating all column stats, and by moving logic directly out of the method we need to pass in those 6 stats map with some extra info, which could easily make the helper method exceeding 10 arguments.
@@ -37,6 +48,7 @@ | |||
|
|||
import static org.apache.iceberg.types.Types.NestedField.optional; | |||
import static org.apache.iceberg.types.Types.NestedField.required; | |||
import static org.junit.Assert.assertEquals; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: the community prefers to avoid static method imports. There are places (usually my fault) that still import optional and required like the lines above, but we tend to discourage doing this in new pull requests. Could you remove this and just import Assert
?
@@ -125,7 +134,9 @@ public static Metrics footerMetrics(ParquetMetadata metadata, Stream<FieldMetric | |||
} else if (!stats.isEmpty()) { | |||
increment(nullValueCounts, fieldId, stats.getNumNulls()); | |||
|
|||
if (metricsMode != MetricsModes.Counts.get()) { | |||
// since Parquet includes NaN for upper/lower bounds of floating point column that we don't want, | |||
// we have tracked metrics for such columns ourselves and thus do not need to rely on Parquet's stats. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You might want to rephrase this not to assume the reason why fieldMetricsMap
contains metrics. If there are metrics gathered by Iceberg for a column, then we should use those instead of the ones from Parquet. The NaN handling for float and double columns is one case of that. But there may be others in the future.
lowerBounds.put(fieldId, Literal.of((Double) metrics.lowerBound())); | ||
upperBounds.put(fieldId, Literal.of((Double) metrics.upperBound())); | ||
} else { | ||
throw new IllegalStateException("[BUG] Only Float/Double column metrics should be collected by Iceberg"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor: I would probably rephrase this as "Expected only float or double column metrics" and throw an UnsupportedOperationException
. I think that is (marginally) more clear because it isn't quite right to say that only float metrics should be collected. The code assumed that, but there isn't a reason for that to be the case other than we didn't implement the other types.
toBufferMap(fileSchema, upperBounds)); | ||
} | ||
|
||
private static void updateFloatingColumnsBounds( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor: You might consider renaming this so that it doesn't assume that Iceberg is maintaining bounds only for floating point columns. Maybe updateFromFieldMetrics
?
@@ -99,6 +104,10 @@ public static Metrics footerMetrics(ParquetMetadata metadata, Stream<FieldMetric | |||
MessageType parquetTypeWithIds = getParquetTypeWithIds(metadata, nameMapping); | |||
Schema fileSchema = ParquetSchemaUtil.convertAndPrune(parquetTypeWithIds); | |||
|
|||
Map<Integer, FieldMetrics> fieldMetricsMap = Optional.ofNullable(fieldMetrics) | |||
.map(stream -> stream.collect(Collectors.toMap(FieldMetrics::id, Function.identity()))) | |||
.orElseGet(HashMap::new); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not .orElse(ImmutableMap.of())
? Seems strange to create a new map if field metrics are missing. And won't stream.collect(Collectors.toMap(...))
return an empty map if the stream is empty? Does this need to handle a null stream or can it just assume that it will be non-null?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was trying to be defensive in handling null stream here which is why I had this Optional wrapper and orElse
, but you are right the current implementation wouldn't produce a null stream. I'll add a precondition check for null stream and then directly use the stream to create the map.
public FloatFieldMetrics(int id, | ||
long nanValueCount) { | ||
super(id, 0L, 0L, nanValueCount, null, null); | ||
public class FloatFieldMetrics extends FieldMetrics<Number> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this is a public class, I think it would make more sense to split it into an implementation for float and an implementation for double and have each implementation correctly set the type that is tracked. That avoids needing to test the type of upper bound or lower bound because you know that both are going to be the same. That's an assumption that the code below makes, but it isn't necessarily the case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good! I was hesitant to do that since I think there are too many duplicated code, I guess I was trying too hard to eliminate duplications...
@Override | ||
public ByteBuffer upperBound() { | ||
throw new IllegalStateException("Shouldn't access this method, as this metric is tracked in file statistics. "); | ||
public static class DoubleFieldMetricsContext extends AbstractFloatFieldMetricsContext<Double> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This class is suspiciously similar to the builder pattern, so I would probably just convert it to be a builder for FieldMetrics
instead:
- Rename
DoubleFieldMetricsContext
toDoubleFieldMetrics.Builder
and create it withbuilderFor(int id)
- Rename
updateMetricsContext
to something shorter, likeaddValue
- Rename
buildMetrics
to justbuild
|
||
@Override | ||
public long valueCount() { | ||
throw new IllegalStateException("Shouldn't access this method, as this metric is tracked in file statistics. "); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We typically prefer UnsupportedOperationException
for cases like this rather than IllegalStateException
, which is described in javadoc as an exception that "Signals that a method has been invoked at an illegal or inappropriate time". While you could argue that any time is an "inappropriate time", I think unsupported describes the situation better.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thinking about this a bit more, would it be a bad thing to collect the stats so that future uses don't need to go back and modify this class? Seems like there isn't much benefit to skipping the implementation for just these two methods.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We had to spend some extra effort on nullable writers to make them update null count and value count in order to get the right results for the unimplemented methods, but that's not complicated and will update.
if (Double.isNaN(value)) { | ||
this.nanValueCount++; | ||
} else { | ||
if (lowerBound == null || Double.compare(value, lowerBound) < 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mentioned this on the Avro PR, but I think it would be better to make lowerBound
and upperBound
primitives and get rid of the null checks here. Then you can use valueCount - nullValueCount > 0
to determine whether to return the bounds or null.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry haven't checked the avro PR yet so didn't see that comment... writers creating this already has null values filtered out so null value count is always 0, but the idea could still work. Will update!
@@ -99,6 +105,9 @@ public static Metrics footerMetrics(ParquetMetadata metadata, Stream<FieldMetric | |||
MessageType parquetTypeWithIds = getParquetTypeWithIds(metadata, nameMapping); | |||
Schema fileSchema = ParquetSchemaUtil.convertAndPrune(parquetTypeWithIds); | |||
|
|||
Map<Integer, FieldMetrics> fieldMetricsMap = fieldMetrics.collect( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FieldMetrics
is parameterized, right? I think this should be FieldMetrics<?>
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will modify the ones for Parquet and ORC, and leave the Avro related ones as is, since we are going to modify them in the Avro metrics support PR anyway. Let me know if it works!
// upper and lower bounds will both null or neither | ||
lowerBounds.remove(fieldId); | ||
upperBounds.remove(fieldId); | ||
} else if (metrics.upperBound() instanceof Float) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this should be metrics instanceof FloatFieldMetrics
instead of checking the bound.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually due to having to modify the option writer to return correct value count/null value count, this can go different directions:
- we either let the option writer to explicitly check for float/double writer and create the corresponding field metric object type (also have to make float/double field metrics' constructor public)
- or we create the generic field metric type in option writer, and continue to check instanceof on bounds here
For now I decided latter for better flexibility, but don't have strong opinion either way. Please let me know your thoughts!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, I see. Good point. Let's go with this for now. In the future, we could keep the value class in the metrics, so you could do something like this:
if (metrics.javaType() instanceof Float) { ... }
But that's probably not worth it at this point.
|
||
// only check for MetricsModes.None, since we don't truncate float/double values. | ||
if (metricsMode != MetricsModes.None.get()) { | ||
if (metrics.upperBound() == null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can metrics
expose a better way to detect this than null
? What about hasBounds()
?
|
||
@Override | ||
public void nullWrite() { | ||
nullValueCount++; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems like it may be a good idea to move null tracking into the builder, so you could just run metricsBuilder.addValue(null)
or metricsBuilder.addNullValue()
. Then you wouldn't need to rebuild the metrics later.
I made a few comments, but I don't think there are any issues with this that need to be fixed so I merged it. Thanks @yyanyy! |
FieldMetrics
related classes)