Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor reusable map data in Spark and Flink parquet readers #1331 #1572

Conversation

holdenk
Copy link
Contributor

@holdenk holdenk commented Oct 9, 2020

No description provided.


package org.apache.iceberg.parquet;

public interface ReusableArrayData {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was it necessary to make this an interface? Because this does allocation, it seems much harder to make it an interface than to make it an abstract class that handles allocation and the values array internally.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, the problem is Spark's base class is an abstract class and Flink's is an interface :/

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking about this today. Originally I had assumed that I could not have the flink one inherit from Spark because flink & spark wouldn't want to take dependencies on each other. But since the flink one is an interface, I'll give it a shot and see if I can make this an abstract class.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Circled back, and yeah I think this does have to be an interface :/

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think so as well. That just means I'll need to be a bit more careful with the review since it is more complicated than I thought. I should have time today, hopefully.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible that design the ReusableArrayData as a generic class ReusableArrayData<T> ?

and theFlinkReusableArrayData and SparkReusableArrayData would have a private member which is ReusableArrayData<Object> , then both of them could delegate methods such as capacity(), setNumElements(), getNumElements().

Making the ReusableArrayData an interface just confused me a bit, for me it should have a complete implementation , although its type is generic.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so since we can't do multiple inheritence with the class.

}
keys().update(size(), key);
values().update(size(), value);
setNumElements(size() + 1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this need to set this every time a new pair is added? I think it would be better to set the valid size just before returning, to avoid extra work.

Copy link
Contributor

@kbendick kbendick Oct 12, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I might be misinterpreting your question / concern, but it seems like at least in the SparkReusableMapData this is the case.

If I'm reading this correctly, addPair is overwritten and then setNumElements is only called on buildMap. Again, I might be misinterpreting the question / concern but for a default implementation this seems fine given that it seems to be overridden on the Spark side and on the Flink side too (where theres a buildMap function).

private static class MapReader<K, V> extends RepeatedKeyValueReader<MapData, SparkReusableMapData, K, V> {
    // Removed to the bare minimum ....
    @Override
    protected void addPair(SparkReusableMapData map, K key, V value) {
      if (writePos >= map.capacity()) {
        map.grow();
      }

      map.keys.values[writePos] = key;
      map.values.values[writePos] = value;

      writePos += 1;
    }

    @Override
    protected MapData buildMap(SparkReusableMapData map) {
      map.setNumElements(writePos);
      return map;
    }

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, my point is that we don't need to set the number of valid elements until the end, just before MapData is returned.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So yes we could do that, but we'd be keeping track of the write position in the MapReader so this just goves the grow code into the base class. I can swap it back around though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh right because this is an interface we need to call a method anyways since we can't have a concrete type in the interface.

@@ -655,6 +653,11 @@ public int numElements() {
return numElements;
}

@Override
public int getNumElements() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before, this was called numElements. Could you revert the name change?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like the function numElements is still there, just above this one

Copy link
Contributor

@kbendick kbendick Oct 10, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like the function numElements is still there (it's part of the catalyst abstract class), in the lines just above these ones.

But unless there's something going on with Spark, I agree with @rdblue.

I looked to see if getNumElements were needed on the Flink side, as Flink highly favors POJOs for performance reasons within their own serialization that they can infer and then for style reasons they also tend to use POJO-y like names at times even when its not always necessarily needed in the current moment, but the Flink ArrayData interface uses size instead, so keeping the iceberg ReusableArrayData interface to use numElements seems like it would kill two birds with one stone by also fulfilling the catalyst contract.

But maybe there's something we're not seeing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah this is a good point, I'll simplify the interface so we don't have numElements and getNumElements.

@kbendick
Copy link
Contributor

This is an important task so thank you for taking it on @holdenk

@holdenk
Copy link
Contributor Author

holdenk commented Oct 14, 2020

Thanks y'all for the reviews. Sorry I'm a little slow responding to this, I'm currently dealing with a race condition inside of some new Spark code but I'm hoping to circle back to this PR before the end of the week.

protected RowData buildStruct(GenericRowData struct) {
return struct;
}
private static class FlinkReusableMapData implements ReusableMapData, MapData {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was the order of classes changed? It looks like that may be why there are more changes in this diff.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me see if I can minimize the changes in the diff this weekend.

values().setNumElements(numElements);
}

int size();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: how about making this method to return the keys().getNumElements() by default ? then we may don't have to implement it in subclasses now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried that, doesn't work since size is required by the Spark/Flink class/interface.

@holdenk holdenk force-pushed the refactor-ReusableMapData-in-spark-and-flink-parquet-readers-1331 branch from c62215b to d7c33c5 Compare December 16, 2020 19:41
@rdblue
Copy link
Contributor

rdblue commented Dec 17, 2020

I just talked with @holdenk directly about this and we agreed that it probably isn't the right direction to go because the shared code needs to be carried by an interface instead of an abstract class. The interface can't handle its own state, so it ends up having way more method calls to get state from the child classes. That splits state that should be managed by a single class across multiple places (e.g. buffer growth) and could introduce unnecessary dispatch costs due to the calls.

We agreed that it is cleaner to have some code duplication instead, so I'll close this issue. Thanks for investigating it and working on the prototype, @holdenk!

@rdblue rdblue closed this Dec 17, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants