Skip to content

Commit f6ab57d

Browse files
authored
Fix broken implementation of ParTaskImpl::tasksFromIterable (linkedin#347)
The `tasksFromIterable` method in `ParTaskImpl` needs to read the contents of an iterator and then create an array of the read tasks. This is an unfortunate performance/memory tradeoff of using these two things: iterators don't have a length and arrays need one. The implementation reads the iterator's contents into a collection and then turns that collection into an array. The problem is, it then tries to cast that array of `Object` into a more narrow type, which is not allowed. This _always_ results in a ClassCastException if the value is non null. The exists tests did not cover this case. The code has been fixed to take a less performant path and delegate the array creation to `tasksFromCollection`. A test has been written for the `Iterable` code path and verified through coverage analysis.
1 parent 53ba29d commit f6ab57d

File tree

2 files changed

+49
-3
lines changed

2 files changed

+49
-3
lines changed

subprojects/parseq/src/main/java/com/linkedin/parseq/ParTaskImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ private Task<? extends Task<? extends T>>[] tasksFromIterable(Iterable<? extends
7373
final Task<T> coercedTask = (Task<T>) task;
7474
taskList.add(coercedTask);
7575
}
76-
return (Task<? extends Task<? extends T>>[]) taskList.toArray();
76+
return tasksFromCollection(taskList);
7777
}
7878

7979
@SuppressWarnings("unchecked")

subprojects/parseq/src/test/java/com/linkedin/parseq/TestParTask.java

Lines changed: 48 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,9 @@
88
import java.util.ArrayList;
99
import java.util.Arrays;
1010
import java.util.Collections;
11+
import java.util.Iterator;
1112
import java.util.List;
13+
import java.util.NoSuchElementException;
1214
import java.util.concurrent.CountDownLatch;
1315
import java.util.concurrent.TimeUnit;
1416
import java.util.concurrent.atomic.AtomicInteger;
@@ -25,6 +27,25 @@
2527
* @author Chris Pettitt
2628
*/
2729
public class TestParTask extends BaseEngineTest {
30+
/**
31+
* A helper to create an {@link Iterable} that does not also implement java.util.Collection.
32+
*
33+
* @param tasks an array of tasks to return from the iterable
34+
* @return an iterable over provided array of tasks
35+
* @param <T> the type of task
36+
*/
37+
private static <T> Iterable<Task<? extends T>> asIterable(final Task<? extends T>[] tasks) {
38+
return () -> new Iterator<Task<? extends T>>() {
39+
int current = 0;
40+
public boolean hasNext() { return current < tasks.length; }
41+
public Task<? extends T> next() {
42+
if (!hasNext()) { throw new NoSuchElementException(); }
43+
return tasks[current++];
44+
}
45+
public void remove() { throw new IllegalStateException("Not implemented for tests."); }
46+
};
47+
}
48+
2849
@Test
2950
public void testIterableParWithEmptyList() {
3051
try {
@@ -48,6 +69,31 @@ public void testIterableParWithSingletonList() throws InterruptedException {
4869
assertEquals(valueStr, task.get());
4970
}
5071

72+
@Test
73+
public void testCollectionSeqWithMultipleElements() throws InterruptedException {
74+
final int iters = 500;
75+
76+
final Task<?>[] tasks = new BaseTask<?>[iters];
77+
final AtomicInteger counter = new AtomicInteger(0);
78+
for (int i = 0; i < iters; i++) {
79+
tasks[i] = Task.action("task-" + i, () -> {
80+
// Note: We intentionally do not use CAS. We guarantee that
81+
// the run method of Tasks are never executed in parallel.
82+
final int currentCount = counter.get();
83+
counter.set(currentCount + 1);
84+
} );
85+
}
86+
87+
final ParTask<?> par = par(Arrays.asList(tasks)); // The returned object implements Collection.
88+
89+
runAndWait("TestParTask.testIterableSeqWithMultipleElements", par);
90+
91+
assertEquals(500, par.getSuccessful().size());
92+
assertEquals(500, par.getTasks().size());
93+
assertEquals(500, par.get().size());
94+
assertEquals(500, counter.get());
95+
}
96+
5197
@Test
5298
public void testIterableSeqWithMultipleElements() throws InterruptedException {
5399
final int iters = 500;
@@ -63,7 +109,7 @@ public void testIterableSeqWithMultipleElements() throws InterruptedException {
63109
} );
64110
}
65111

66-
final ParTask<?> par = par(Arrays.asList(tasks));
112+
final ParTask<?> par = par(asIterable(tasks));
67113

68114
runAndWait("TestParTask.testIterableSeqWithMultipleElements", par);
69115

@@ -77,7 +123,7 @@ public void testIterableSeqWithMultipleElements() throws InterruptedException {
77123
@Test
78124
public void testAsyncTasksInPar() throws InterruptedException {
79125
// Tasks cannot have their run methods invoked at the same time, however
80-
// asynchronous tasks are allowed to execute concurrently outside of their
126+
// asynchronous tasks are allowed to execute concurrently outside their
81127
// run methods. This test verifies that two asynchronous tasks are not
82128
// serialized such that one must complete before the other.
83129

0 commit comments

Comments
 (0)