Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

API: Refactor FileScanTask #5077

Merged
merged 8 commits into from
Jun 28, 2022

Conversation

aokolnychyi
Copy link
Contributor

No description provided.

@aokolnychyi aokolnychyi force-pushed the refactor-file-scan-task branch from 1e61473 to 79699fc Compare June 17, 2022 19:53
@@ -29,7 +29,7 @@
* Scan objects are immutable and can be shared between threads. Refinement methods, like
* {@link #select(Collection)} and {@link #filter(Expression)}, create new TableScan instances.
*/
public interface Scan<T extends Scan<T>> {
public interface Scan<ThisT, T extends ScanTask, S extends InputSplit<T>> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am debating whether we need to attach a boundary to ThisT. It won't harm but we don't do that in any other places and it makes the definition a bit more cumbersome as Scan has 3 params now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is still open.

@@ -217,6 +218,18 @@ public Iterable<FileScanTask> split(long splitSize) {
throw new UnsupportedOperationException("Cannot split a task which is already split");
}

@Override
public boolean isAdjacent(FileScanTask other) {
return file().equals(other.file()) && offset + len == other.start();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I copied this logic from an existing place but we have to remember that files will be equal only if references are equal, which I think is true here. Our data and delete file implementations don't override equals.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is correct.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the other code we assume there is an ordering of the files.

1 is adjacent to 2
2 is not adjacent to 1

We may want to change the api definition just to note that it is not commutative

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about updating the code to cover both in the future?


package org.apache.iceberg;

public interface SplittableScanTask<ThisT> extends ScanTask {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the addition of SplittableScanTask. This looks pretty good.

@rdblue
Copy link
Contributor

rdblue commented Jun 17, 2022

Overall I think this looks good. Are you happy with the direction this is going, @aokolnychyi?

@aokolnychyi
Copy link
Contributor Author

aokolnychyi commented Jun 17, 2022

@rdblue, I spent quite some time looking at it so I am no longer sure :) We need to sort out some details but it does unlock having tasks that are not FileScanTask and also parametrizes our scans. It is probably worth it.


package org.apache.iceberg;

public interface SplittableScanTask<ThisT> extends ScanTask {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this to be SplittableScanTask since I think our only usage is in combining? I do like the idea here though I just wonder if we will actually need to re-split or if we should always be producing minimally sized splits (one row group) and then combining them?

Avro is still an issue I guess ...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We actually leverage split before combining in TableScanUtil.

Avro is still splittable, right? I am not sure whether we persist split offsets in the metadata so planning may not be as optimal as for Parquet.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Our Avro code I believe just splits wherever the offsets fall. I just meant the difference between row-group splittable and generically splittable

Copy link
Member

@RussellSpitzer RussellSpitzer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left some comments, my biggest questions are about some of our API contracts. Just want to make sure we are clear on what we expect as args and what we expect to return.

Copy link
Collaborator

@szehon-ho szehon-ho left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally looks good, added a few comments.

There may be quite some code to change to adopt these new ways to split/combine tasks?


public BaseScanTaskGroup(List<T> tasks) {
Preconditions.checkNotNull(tasks, "tasks cannot be null");
this.tasks = Lists.newArrayList(tasks);
Copy link
Collaborator

@szehon-ho szehon-ho Jun 21, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just use ImmutableList.of() and then return it directly in tasks()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is on purpose to avoid Kryo serialization issues. We usually rely on arrays but generics complicate things. I'll need to pass around Class<T> to make it work with arrays. I added tests to make sure Kryo works with mutable lists. I think that's also true for Flink but I can switch to arrays if mutable lists are a problem.

Copy link
Contributor

@rdblue rdblue Jun 22, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about using this?

    List<T> asList = Lists.newArrayList(tasks);
    Preconditions.checkArgument(asList.size() > 0, "...");
    this.taskArray = (T[]) Array.newInstance(asList.get(0).getClass(), asList.size());

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rdblue, I am not sure this will be safe, unfortunately. If we are to support lists with multiple task types, we can't assume the rest of the list has the same type as the first element. We may end up with an array store exception at runtime.

Suppose we have List<ParentTask> with two elements of type ChildTask1 and ChildTask2. If we create an array of type ChildTask1, we won't be able to store ChildTask2 in it (even if we cast the array to the parent interface). It will compile but probably fail at runtime.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could just use Object[] then?

Copy link
Contributor Author

@aokolnychyi aokolnychyi Jun 26, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, we can use an object array. I also added a transient list to avoid building a list on each call.

@github-actions github-actions bot added the spark label Jun 22, 2022
@flyrain
Copy link
Contributor

flyrain commented Jun 22, 2022

LGTM

CombinableScanTask<? extends T> lastCombinableTask = null;

for (T task : tasks) {
if (task instanceof CombinableScanTask<?>) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is correct. It doesn't matter if the next task is combineable. It only matters if the last task was. And we can't keep around the last combineable task because then we would possibly combine tasks out of order.

I think lastCombineableTask should be lastTask and this should check whether lastTask is a combineable in order to try combining with the current task. The new task, if not combined, should always be set as the new lastTask.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right. The old logic assumed we can combine only tasks of the same type so this logic worked. If we decide to go this route and have CombinableScanTask, I'll adapt. I didn't want to change this logic completely before discussing CombinableScanTask.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I updated.

*
* @param <ThisT> the child Java API class
*/
public interface MergeableScanTask<ThisT> extends ScanTask {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I kept it separate from SplittableScanTask as FileScanTask is splittable but SplitScanTask is mergable.

@@ -88,4 +94,109 @@ public void testPlanTaskWithDeleteFiles() {
expectedCombinedTasks.get(i).files(), combinedScanTasks.get(i).files());
}
}

@Test
public void testTaskGroupPlanning() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are tests where only some tasks are splittable and mergable.

@SuppressWarnings("unchecked")
public Collection<T> tasks() {
if (taskList == null) {
synchronized (this) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have a problem with this, but do you really expect this to be accessed from different threads after construction?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did it just in case given how hard it would be debug such issues. Better be safe than sorry :)

@rdblue
Copy link
Contributor

rdblue commented Jun 28, 2022

Looks good to me! Merge when you're ready.

Do we also want to deprecate CombinedScanTask so we can just use the group after 1.0?

@aokolnychyi
Copy link
Contributor Author

I haven't made my mind on deprecating CombinedScanTask cause it is so widely used...
We will also have to break TableScan if we switch to ScanTaskGroup.

@aokolnychyi aokolnychyi merged commit 23b97c1 into apache:master Jun 28, 2022
@aokolnychyi
Copy link
Contributor Author

Thanks for reviewing, @RussellSpitzer @szehon-ho @flyrain @rdblue! I know it was a tricky one.

@rdblue
Copy link
Contributor

rdblue commented Jun 28, 2022

I haven't made my mind on deprecating CombinedScanTask cause it is so widely used...
We will also have to break TableScan if we switch to ScanTaskGroup.

Let's not then. Not worth it.

@rdblue rdblue added this to the Iceberg 0.14.0 Release milestone Jun 28, 2022
namrathamyske pushed a commit to namrathamyske/iceberg that referenced this pull request Jul 10, 2022
namrathamyske pushed a commit to namrathamyske/iceberg that referenced this pull request Jul 10, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants