-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
API: Refactor FileScanTask #5077
API: Refactor FileScanTask #5077
Conversation
5f1c64d
to
1e61473
Compare
1e61473
to
79699fc
Compare
@@ -29,7 +29,7 @@ | |||
* Scan objects are immutable and can be shared between threads. Refinement methods, like | |||
* {@link #select(Collection)} and {@link #filter(Expression)}, create new TableScan instances. | |||
*/ | |||
public interface Scan<T extends Scan<T>> { | |||
public interface Scan<ThisT, T extends ScanTask, S extends InputSplit<T>> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am debating whether we need to attach a boundary to ThisT
. It won't harm but we don't do that in any other places and it makes the definition a bit more cumbersome as Scan
has 3 params now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is still open.
core/src/main/java/org/apache/iceberg/BaseIncrementalAppendScan.java
Outdated
Show resolved
Hide resolved
@@ -217,6 +218,18 @@ public Iterable<FileScanTask> split(long splitSize) { | |||
throw new UnsupportedOperationException("Cannot split a task which is already split"); | |||
} | |||
|
|||
@Override | |||
public boolean isAdjacent(FileScanTask other) { | |||
return file().equals(other.file()) && offset + len == other.start(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I copied this logic from an existing place but we have to remember that files will be equal only if references are equal, which I think is true here. Our data and delete file implementations don't override equals.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is correct.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the other code we assume there is an ordering of the files.
1 is adjacent to 2
2 is not adjacent to 1
We may want to change the api definition just to note that it is not commutative
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about updating the code to cover both in the future?
|
||
package org.apache.iceberg; | ||
|
||
public interface SplittableScanTask<ThisT> extends ScanTask { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like the addition of SplittableScanTask
. This looks pretty good.
Overall I think this looks good. Are you happy with the direction this is going, @aokolnychyi? |
@rdblue, I spent quite some time looking at it so I am no longer sure :) We need to sort out some details but it does unlock having tasks that are not |
|
||
package org.apache.iceberg; | ||
|
||
public interface SplittableScanTask<ThisT> extends ScanTask { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need this to be SplittableScanTask since I think our only usage is in combining? I do like the idea here though I just wonder if we will actually need to re-split or if we should always be producing minimally sized splits (one row group) and then combining them?
Avro is still an issue I guess ...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We actually leverage split
before combining in TableScanUtil
.
Avro is still splittable, right? I am not sure whether we persist split offsets in the metadata so planning may not be as optimal as for Parquet.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Our Avro code I believe just splits wherever the offsets fall. I just meant the difference between row-group splittable and generically splittable
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left some comments, my biggest questions are about some of our API contracts. Just want to make sure we are clear on what we expect as args and what we expect to return.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Generally looks good, added a few comments.
There may be quite some code to change to adopt these new ways to split/combine tasks?
|
||
public BaseScanTaskGroup(List<T> tasks) { | ||
Preconditions.checkNotNull(tasks, "tasks cannot be null"); | ||
this.tasks = Lists.newArrayList(tasks); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just use ImmutableList.of() and then return it directly in tasks()?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is on purpose to avoid Kryo serialization issues. We usually rely on arrays but generics complicate things. I'll need to pass around Class<T>
to make it work with arrays. I added tests to make sure Kryo works with mutable lists. I think that's also true for Flink but I can switch to arrays if mutable lists are a problem.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about using this?
List<T> asList = Lists.newArrayList(tasks);
Preconditions.checkArgument(asList.size() > 0, "...");
this.taskArray = (T[]) Array.newInstance(asList.get(0).getClass(), asList.size());
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rdblue, I am not sure this will be safe, unfortunately. If we are to support lists with multiple task types, we can't assume the rest of the list has the same type as the first element. We may end up with an array store exception at runtime.
Suppose we have List<ParentTask>
with two elements of type ChildTask1
and ChildTask2
. If we create an array of type ChildTask1
, we won't be able to store ChildTask2
in it (even if we cast the array to the parent interface). It will compile but probably fail at runtime.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could just use Object[]
then?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, we can use an object array. I also added a transient list to avoid building a list on each call.
LGTM |
CombinableScanTask<? extends T> lastCombinableTask = null; | ||
|
||
for (T task : tasks) { | ||
if (task instanceof CombinableScanTask<?>) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this is correct. It doesn't matter if the next task is combineable. It only matters if the last task was. And we can't keep around the last combineable task because then we would possibly combine tasks out of order.
I think lastCombineableTask
should be lastTask
and this should check whether lastTask
is a combineable in order to try combining with the current task. The new task, if not combined, should always be set as the new lastTask
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right. The old logic assumed we can combine only tasks of the same type so this logic worked. If we decide to go this route and have CombinableScanTask
, I'll adapt. I didn't want to change this logic completely before discussing CombinableScanTask
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I updated.
* | ||
* @param <ThisT> the child Java API class | ||
*/ | ||
public interface MergeableScanTask<ThisT> extends ScanTask { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I kept it separate from SplittableScanTask
as FileScanTask
is splittable but SplitScanTask
is mergable.
@@ -88,4 +94,109 @@ public void testPlanTaskWithDeleteFiles() { | |||
expectedCombinedTasks.get(i).files(), combinedScanTasks.get(i).files()); | |||
} | |||
} | |||
|
|||
@Test | |||
public void testTaskGroupPlanning() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These are tests where only some tasks are splittable and mergable.
@SuppressWarnings("unchecked") | ||
public Collection<T> tasks() { | ||
if (taskList == null) { | ||
synchronized (this) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't have a problem with this, but do you really expect this to be accessed from different threads after construction?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did it just in case given how hard it would be debug such issues. Better be safe than sorry :)
Looks good to me! Merge when you're ready. Do we also want to deprecate |
I haven't made my mind on deprecating |
Thanks for reviewing, @RussellSpitzer @szehon-ho @flyrain @rdblue! I know it was a tricky one. |
Let's not then. Not worth it. |
No description provided.