-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Refactor reusable map data in Spark and Flink parquet readers #1331 #1572
Refactor reusable map data in Spark and Flink parquet readers #1331 #1572
Conversation
|
||
package org.apache.iceberg.parquet; | ||
|
||
public interface ReusableArrayData { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Was it necessary to make this an interface
? Because this does allocation, it seems much harder to make it an interface than to make it an abstract class that handles allocation and the values array internally.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, the problem is Spark's base class is an abstract class and Flink's is an interface :/
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking about this today. Originally I had assumed that I could not have the flink one inherit from Spark because flink & spark wouldn't want to take dependencies on each other. But since the flink one is an interface, I'll give it a shot and see if I can make this an abstract class.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Circled back, and yeah I think this does have to be an interface :/
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I think so as well. That just means I'll need to be a bit more careful with the review since it is more complicated than I thought. I should have time today, hopefully.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible that design the ReusableArrayData as a generic class ReusableArrayData<T>
?
and theFlinkReusableArrayData
and SparkReusableArrayData
would have a private member which is ReusableArrayData<Object>
, then both of them could delegate methods such as capacity(), setNumElements(), getNumElements().
Making the ReusableArrayData an interface just confused me a bit, for me it should have a complete implementation , although its type is generic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think so since we can't do multiple inheritence with the class.
} | ||
keys().update(size(), key); | ||
values().update(size(), value); | ||
setNumElements(size() + 1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this need to set this every time a new pair is added? I think it would be better to set the valid size just before returning, to avoid extra work.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I might be misinterpreting your question / concern, but it seems like at least in the SparkReusableMapData
this is the case.
If I'm reading this correctly, addPair is overwritten and then setNumElements
is only called on buildMap
. Again, I might be misinterpreting the question / concern but for a default implementation this seems fine given that it seems to be overridden on the Spark side and on the Flink side too (where theres a buildMap
function).
private static class MapReader<K, V> extends RepeatedKeyValueReader<MapData, SparkReusableMapData, K, V> {
// Removed to the bare minimum ....
@Override
protected void addPair(SparkReusableMapData map, K key, V value) {
if (writePos >= map.capacity()) {
map.grow();
}
map.keys.values[writePos] = key;
map.values.values[writePos] = value;
writePos += 1;
}
@Override
protected MapData buildMap(SparkReusableMapData map) {
map.setNumElements(writePos);
return map;
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, my point is that we don't need to set the number of valid elements until the end, just before MapData is returned.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So yes we could do that, but we'd be keeping track of the write position in the MapReader
so this just goves the grow code into the base class. I can swap it back around though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh right because this is an interface we need to call a method anyways since we can't have a concrete type in the interface.
@@ -655,6 +653,11 @@ public int numElements() { | |||
return numElements; | |||
} | |||
|
|||
@Override | |||
public int getNumElements() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Before, this was called numElements
. Could you revert the name change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like the function numElements
is still there, just above this one
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like the function numElements
is still there (it's part of the catalyst abstract class), in the lines just above these ones.
But unless there's something going on with Spark, I agree with @rdblue.
I looked to see if getNumElements
were needed on the Flink side, as Flink highly favors POJOs for performance reasons within their own serialization that they can infer and then for style reasons they also tend to use POJO-y like names at times even when its not always necessarily needed in the current moment, but the Flink ArrayData
interface uses size
instead, so keeping the iceberg ReusableArrayData interface to use numElements
seems like it would kill two birds with one stone by also fulfilling the catalyst contract.
But maybe there's something we're not seeing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah this is a good point, I'll simplify the interface so we don't have numElements and getNumElements.
This is an important task so thank you for taking it on @holdenk |
Thanks y'all for the reviews. Sorry I'm a little slow responding to this, I'm currently dealing with a race condition inside of some new Spark code but I'm hoping to circle back to this PR before the end of the week. |
protected RowData buildStruct(GenericRowData struct) { | ||
return struct; | ||
} | ||
private static class FlinkReusableMapData implements ReusableMapData, MapData { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Was the order of classes changed? It looks like that may be why there are more changes in this diff.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me see if I can minimize the changes in the diff this weekend.
values().setNumElements(numElements); | ||
} | ||
|
||
int size(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: how about making this method to return the keys().getNumElements()
by default ? then we may don't have to implement it in subclasses now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried that, doesn't work since size is required by the Spark/Flink class/interface.
…e common code to interface with default methods to allow multiple inheritence.
c62215b
to
d7c33c5
Compare
I just talked with @holdenk directly about this and we agreed that it probably isn't the right direction to go because the shared code needs to be carried by an interface instead of an abstract class. The interface can't handle its own state, so it ends up having way more method calls to get state from the child classes. That splits state that should be managed by a single class across multiple places (e.g. buffer growth) and could introduce unnecessary dispatch costs due to the calls. We agreed that it is cleaner to have some code duplication instead, so I'll close this issue. Thanks for investigating it and working on the prototype, @holdenk! |
No description provided.