Skip to content

Commit 5aa82cc

Browse files
committed
added the basic API for dealing with Fork/Join in a convenient way
added PI estimation using the Monte-Carlo method as an example on how to use that API
1 parent b0a0c21 commit 5aa82cc

File tree

11 files changed

+212
-67
lines changed

11 files changed

+212
-67
lines changed

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
target/
2+
*.iml

pom.xml

Lines changed: 35 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,37 @@
11
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
2-
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
3-
<modelVersion>4.0.0</modelVersion>
4-
<groupId>com.mgu.parallel</groupId>
5-
<artifactId>parallelism-in-java</artifactId>
6-
<packaging>jar</packaging>
7-
<version>1.0-SNAPSHOT</version>
8-
<name>parallelism-in-java</name>
9-
<url>http://maven.apache.org</url>
10-
<dependencies>
11-
<dependency>
12-
<groupId>junit</groupId>
13-
<artifactId>junit</artifactId>
14-
<version>3.8.1</version>
15-
<scope>test</scope>
16-
</dependency>
17-
</dependencies>
2+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
3+
4+
<modelVersion>4.0.0</modelVersion>
5+
<groupId>com.mgu.parallel</groupId>
6+
<artifactId>parallelism-in-java</artifactId>
7+
<packaging>jar</packaging>
8+
<version>0.1.0-SNAPSHOT</version>
9+
<name>parallelism-in-java</name>
10+
11+
<properties>
12+
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
13+
</properties>
14+
15+
<build>
16+
<plugins>
17+
<plugin>
18+
<groupId>org.apache.maven.plugins</groupId>
19+
<artifactId>maven-compiler-plugin</artifactId>
20+
<version>3.1</version>
21+
<configuration>
22+
<source>1.8</source>
23+
<target>1.8</target>
24+
</configuration>
25+
</plugin>
26+
</plugins>
27+
</build>
28+
29+
<dependencies>
30+
<dependency>
31+
<groupId>junit</groupId>
32+
<artifactId>junit</artifactId>
33+
<version>4.11</version>
34+
<scope>test</scope>
35+
</dependency>
36+
</dependencies>
1837
</project>
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
package com.mgu.parallel;
2+
3+
import java.util.concurrent.ForkJoinTask;
4+
import java.util.function.Supplier;
5+
6+
abstract public class AbstractTaskScheduler implements TaskScheduler {
7+
8+
@Override
9+
public <A, B> Tuple2<A, B> parallel(final Supplier<A> taskA, final Supplier<B> taskB) {
10+
final ForkJoinTask<B> right = schedule(taskB);
11+
return new Tuple2<A, B>(taskA.get(), right.join());
12+
}
13+
}

src/main/java/com/mgu/parallel/App.java

Lines changed: 0 additions & 13 deletions
This file was deleted.
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
package com.mgu.parallel;
2+
3+
import java.util.concurrent.ForkJoinPool;
4+
import java.util.concurrent.ForkJoinTask;
5+
import java.util.concurrent.ForkJoinWorkerThread;
6+
import java.util.concurrent.RecursiveTask;
7+
import java.util.function.Supplier;
8+
9+
public class DefaultTaskScheduler extends AbstractTaskScheduler {
10+
11+
private final ForkJoinPool forkJoinPool;
12+
13+
public DefaultTaskScheduler() {
14+
this(ForkJoinPool.commonPool());
15+
}
16+
17+
public DefaultTaskScheduler(final ForkJoinPool forkJoinPool) {
18+
this.forkJoinPool = forkJoinPool;
19+
}
20+
21+
@Override
22+
public <T> ForkJoinTask<T> schedule(final Supplier<T> body) {
23+
24+
final RecursiveTask<T> task = new RecursiveTask<T>() {
25+
@Override
26+
protected T compute() {
27+
return body.get();
28+
}
29+
};
30+
31+
if (isForkJoinWorkerThread()) {
32+
task.fork();
33+
} else {
34+
forkJoinPool.execute(task);
35+
}
36+
37+
return task;
38+
}
39+
40+
private boolean isForkJoinWorkerThread() {
41+
return Thread.currentThread() instanceof ForkJoinWorkerThread;
42+
}
43+
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
package com.mgu.parallel;
2+
3+
import java.util.concurrent.ExecutionException;
4+
import java.util.concurrent.ForkJoinTask;
5+
import java.util.function.Supplier;
6+
7+
public class Schedulers {
8+
9+
public static final DefaultTaskScheduler SCHEDULER = new DefaultTaskScheduler();
10+
11+
private static <T> ForkJoinTask<T> task(final Supplier<T> body) {
12+
return SCHEDULER.schedule(body);
13+
}
14+
15+
public static <A, B> Tuple2<A, B> parallel(final Supplier<A> taskA, final Supplier<B> taskB) {
16+
return SCHEDULER.parallel(taskA, taskB);
17+
}
18+
19+
public static <A, B, C, D> Tuple4<A, B, C, D> parallel(
20+
final Supplier<A> taskA,
21+
final Supplier<B> taskB,
22+
final Supplier<C> taskC,
23+
final Supplier<D> taskD) throws ExecutionException, InterruptedException {
24+
final ForkJoinTask<A> ta = task(taskA);
25+
final ForkJoinTask<B> tb = task(taskB);
26+
final ForkJoinTask<C> tc = task(taskC);
27+
final ForkJoinTask<D> td = task(taskD);
28+
return new Tuple4<>(ta.join(), tb.join(), tc.join(), td.get());
29+
}
30+
}
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
package com.mgu.parallel;
2+
3+
import java.util.concurrent.ForkJoinTask;
4+
import java.util.function.Supplier;
5+
6+
public interface TaskScheduler {
7+
8+
<T>ForkJoinTask<T> schedule(Supplier<T> body);
9+
10+
<A, B> Tuple2<A, B> parallel(Supplier<A> taskA, Supplier<B> taskB);
11+
}
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
package com.mgu.parallel;
2+
3+
public class Tuple2<A, B> {
4+
5+
public final A a;
6+
public final B b;
7+
8+
public Tuple2(final A a, final B b) {
9+
this.a = a;
10+
this.b = b;
11+
}
12+
}
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package com.mgu.parallel;
2+
3+
public class Tuple4<A, B, C, D> {
4+
5+
public final A a;
6+
public final B b;
7+
public final C c;
8+
public final D d;
9+
10+
public Tuple4(final A a, final B b, final C c, final D d) {
11+
this.a = a;
12+
this.b = b;
13+
this.c = c;
14+
this.d = d;
15+
}
16+
}
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
package com.mgu.parallel.pi;
2+
3+
import com.mgu.parallel.Tuple4;
4+
5+
import java.util.Random;
6+
import java.util.concurrent.ExecutionException;
7+
8+
import static com.mgu.parallel.Schedulers.parallel;
9+
10+
public class PiEstimation {
11+
12+
public double computeSequentially(final int iterations) {
13+
return 4.0 * monteCarloCount(iterations) / iterations;
14+
}
15+
16+
public double computeParallel(final int iterations) throws ExecutionException, InterruptedException {
17+
final Tuple4<Integer, Integer, Integer, Integer> result = parallel(
18+
() -> monteCarloCount(iterations/4),
19+
() -> monteCarloCount(iterations/4),
20+
() -> monteCarloCount(iterations/4),
21+
() -> monteCarloCount(iterations-3*iterations/4));
22+
return 4.0 * (result.a + result.b + result.c + result.d) / iterations;
23+
}
24+
25+
private int monteCarloCount(final int iterations) {
26+
final Random randomX = new Random();
27+
final Random randomY = new Random();
28+
int hits = 0;
29+
for (int i = 0; i < iterations; i++) {
30+
final double x = randomX.nextDouble();
31+
final double y = randomY.nextDouble();
32+
if (x*x + y*y < 1) hits++;
33+
}
34+
return hits;
35+
}
36+
37+
public static void main(String[] args) throws ExecutionException, InterruptedException {
38+
final PiEstimation piMonteCarlo = new PiEstimation();
39+
long start = System.nanoTime();
40+
final double piSeq = piMonteCarlo.computeSequentially(50000000);
41+
long end = System.nanoTime();
42+
long duration = (end - start) / 1000000;
43+
System.out.println("Computed PI (" + piSeq + ") sequentially in " + duration + " ms.");
44+
start = System.nanoTime();
45+
final double piPar = piMonteCarlo.computeParallel(50000000);
46+
end = System.nanoTime();
47+
duration = (end - start) / 1000000;
48+
System.out.println("Computed PI (" + piPar + ") parallel in " + duration + " ms.");
49+
}
50+
}

0 commit comments

Comments
 (0)