Skip to content

Commit 8380c7f

Browse files
author
Jano
committed
refactored task scavenging mechanism
1 parent 0434b8c commit 8380c7f

File tree

2 files changed

+34
-47
lines changed

2 files changed

+34
-47
lines changed

src/main/java/com/jano7/executor/KeySequentialRunner.java

Lines changed: 29 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ of this software and associated documentation files (the "Software"), to deal
2323
*/
2424
package com.jano7.executor;
2525

26-
import java.util.ArrayList;
2726
import java.util.HashMap;
2827
import java.util.LinkedList;
2928
import java.util.concurrent.Executor;
@@ -33,18 +32,39 @@ public final class KeySequentialRunner<Key> {
3332

3433
private final class KeyRunner {
3534

36-
private final LinkedList<Runnable> tasks = new LinkedList<>();
37-
private boolean active = false;
35+
private final Key key;
36+
private LinkedList<Runnable> tasks;
3837

39-
public synchronized void run(Runnable task) {
40-
if (active) {
41-
tasks.addFirst(task);
42-
} else {
43-
active = true;
38+
KeyRunner(Key key) {
39+
this.key = key;
40+
}
41+
42+
synchronized void run(Runnable task) {
43+
if (tasks == null) {
44+
tasks = new LinkedList<>();
4445
runTask(task);
46+
} else {
47+
tasks.add(task);
4548
}
4649
}
4750

51+
private synchronized Runnable pollTask() {
52+
return tasks.poll();
53+
}
54+
55+
private Runnable nextTask() {
56+
Runnable runnable = pollTask();
57+
if (runnable == null) {
58+
synchronized (KeySequentialRunner.this) {
59+
runnable = pollTask();
60+
if (runnable == null) {
61+
keyRunners.remove(key);
62+
}
63+
}
64+
}
65+
return runnable;
66+
}
67+
4868
private void runTask(Runnable task) {
4969
underlyingExecutor.execute(() -> {
5070
task.run();
@@ -60,18 +80,6 @@ private void runTask(Runnable task) {
6080
}
6181
});
6282
}
63-
64-
private synchronized Runnable nextTask() {
65-
Runnable runnable = tasks.pollLast();
66-
if (runnable == null) {
67-
active = false;
68-
}
69-
return runnable;
70-
}
71-
72-
public synchronized boolean isActive() {
73-
return active;
74-
}
7583
}
7684

7785
private final Executor underlyingExecutor;
@@ -90,26 +98,12 @@ public KeySequentialRunner(Executor underlyingExecutor, TaskExceptionHandler exc
9098
}
9199

92100
public synchronized void run(Key key, Runnable task) {
93-
KeyRunner runner = keyRunners.get(key);
94-
if (runner == null) {
95-
runner = new KeyRunner();
96-
keyRunners.put(key, runner);
97-
}
98-
runner.run(() -> {
101+
keyRunners.computeIfAbsent(key, KeyRunner::new).run(() -> {
99102
try {
100103
task.run();
101104
} catch (Throwable t) {
102105
exceptionHandler.handleTaskException(t);
103106
}
104107
});
105-
scavengeInactiveRunners();
106-
}
107-
108-
private void scavengeInactiveRunners() {
109-
for (Key key : new ArrayList<>(keyRunners.keySet())) {
110-
if (!keyRunners.get(key).isActive()) {
111-
keyRunners.remove(key);
112-
}
113-
}
114108
}
115109
}

src/test/java/com/jano7/executor/KeySequentialRunnerTest.java

Lines changed: 5 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,10 @@ of this software and associated documentation files (the "Software"), to deal
2727

2828
import java.lang.reflect.Field;
2929
import java.util.Map;
30-
import java.util.concurrent.*;
30+
import java.util.concurrent.CountDownLatch;
31+
import java.util.concurrent.ExecutorService;
32+
import java.util.concurrent.Executors;
33+
import java.util.concurrent.TimeUnit;
3134

3235
import static org.junit.Assert.assertEquals;
3336
import static org.junit.Assert.assertTrue;
@@ -38,15 +41,8 @@ public class KeySequentialRunnerTest {
3841
public void noMemoryLeak() throws InterruptedException, IllegalAccessException, NoSuchFieldException {
3942
final int threadCount = 10;
4043
ExecutorService underlyingExecutor = Executors.newFixedThreadPool(threadCount);
41-
Executor testExecutor = (Runnable runnable) -> {
42-
try {
43-
underlyingExecutor.execute(runnable);
44-
} catch (RejectedExecutionException ignored) {
45-
runnable.run();
46-
}
47-
};
4844

49-
KeySequentialRunner<String> runner = new KeySequentialRunner<>(testExecutor);
45+
KeySequentialRunner<String> runner = new KeySequentialRunner<>(underlyingExecutor);
5046

5147
Field keyRunners = KeySequentialRunner.class.getDeclaredField("keyRunners");
5248
keyRunners.setAccessible(true);
@@ -70,9 +66,6 @@ public void noMemoryLeak() throws InterruptedException, IllegalAccessException,
7066
underlyingExecutor.shutdown();
7167
underlyingExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
7268

73-
runner.run("key", () -> {
74-
});
75-
7669
assertEquals(0, ((Map<?, ?>) keyRunners.get(runner)).size());
7770
}
7871
}

0 commit comments

Comments
 (0)