Skip to content

Commit 4188f58

Browse files
committed
Set removeOnCancelPolicy on the threadpool if supported
1 parent 294a8e8 commit 4188f58

File tree

2 files changed

+80
-9
lines changed

2 files changed

+80
-9
lines changed

src/main/java/rx/internal/schedulers/NewThreadWorker.java

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,14 @@
1515
*/
1616
package rx.internal.schedulers;
1717

18-
import rx.Scheduler;
19-
import rx.Subscription;
18+
import java.lang.reflect.Method;
19+
import java.util.concurrent.*;
20+
21+
import rx.*;
2022
import rx.functions.Action0;
21-
import rx.plugins.RxJavaPlugins;
22-
import rx.plugins.RxJavaSchedulersHook;
23+
import rx.plugins.*;
2324
import rx.subscriptions.Subscriptions;
2425

25-
import java.util.concurrent.*;
26-
2726
/**
2827
* @warn class description missing
2928
*/
@@ -35,6 +34,19 @@ public class NewThreadWorker extends Scheduler.Worker implements Subscription {
3534
/* package */
3635
public NewThreadWorker(ThreadFactory threadFactory) {
3736
executor = Executors.newScheduledThreadPool(1, threadFactory);
37+
// Java 7+: cancelled future tasks can be removed from the executor thus avoiding memory leak
38+
for (Method m : executor.getClass().getMethods()) {
39+
if (m.getName().equals("setRemoveOnCancelPolicy")
40+
&& m.getParameterCount() == 1
41+
&& m.getParameters()[0].getType() == Boolean.TYPE) {
42+
try {
43+
m.invoke(executor, true);
44+
} catch (Exception ex) {
45+
RxJavaPlugins.getInstance().getErrorHandler().handleError(ex);
46+
}
47+
break;
48+
}
49+
}
3850
schedulersHook = RxJavaPlugins.getInstance().getSchedulersHook();
3951
}
4052

src/test/java/rx/schedulers/CachedThreadSchedulerTest.java

Lines changed: 62 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,16 @@
1616

1717
package rx.schedulers;
1818

19+
import java.lang.management.*;
20+
import java.util.concurrent.*;
21+
22+
import junit.framework.Assert;
23+
1924
import org.junit.Test;
25+
2026
import rx.Observable;
2127
import rx.Scheduler;
22-
import rx.functions.Action1;
23-
import rx.functions.Func1;
24-
28+
import rx.functions.*;
2529
import static org.junit.Assert.assertTrue;
2630

2731
public class CachedThreadSchedulerTest extends AbstractSchedulerConcurrencyTests {
@@ -66,4 +70,59 @@ public final void testUnhandledErrorIsDeliveredToThreadHandler() throws Interrup
6670
public final void testHandledErrorIsNotDeliveredToThreadHandler() throws InterruptedException {
6771
SchedulerTests.testHandledErrorIsNotDeliveredToThreadHandler(getScheduler());
6872
}
73+
74+
@Test(timeout = 30000)
75+
public void testCancelledTaskRetention() throws InterruptedException {
76+
try {
77+
ScheduledThreadPoolExecutor.class.getMethod("setRemoveOnCancelPolicy", Boolean.TYPE);
78+
79+
System.out.println("Wait before GC");
80+
Thread.sleep(1000);
81+
82+
System.out.println("GC");
83+
System.gc();
84+
85+
Thread.sleep(1000);
86+
87+
88+
MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean();
89+
MemoryUsage memHeap = memoryMXBean.getHeapMemoryUsage();
90+
long initial = memHeap.getUsed();
91+
92+
System.out.printf("Starting: %.3f MB%n", initial / 1024.0 / 1024.0);
93+
94+
Scheduler.Worker w = Schedulers.io().createWorker();
95+
for (int i = 0; i < 750000; i++) {
96+
if (i % 50000 == 0) {
97+
System.out.println(" -> still scheduling: " + i);
98+
}
99+
w.schedule(Actions.empty(), 1, TimeUnit.DAYS);
100+
}
101+
102+
memHeap = memoryMXBean.getHeapMemoryUsage();
103+
long after = memHeap.getUsed();
104+
System.out.printf("Peak: %.3f MB%n", after / 1024.0 / 1024.0);
105+
106+
w.unsubscribe();
107+
108+
System.out.println("Wait before second GC");
109+
Thread.sleep(1000);
110+
111+
System.out.println("Second GC");
112+
System.gc();
113+
114+
Thread.sleep(1000);
115+
116+
memHeap = memoryMXBean.getHeapMemoryUsage();
117+
long finish = memHeap.getUsed();
118+
System.out.printf("After: %.3f MB%n", finish / 1024.0 / 1024.0);
119+
120+
if (finish > initial * 5) {
121+
Assert.fail(String.format("Tasks retained: %.3f -> %.3f -> %.3f", initial / 1024 / 1024.0, after / 1024 / 1024.0, finish / 1024 / 1024d));
122+
}
123+
} catch (NoSuchMethodException ex) {
124+
// not supported, no reason to test for it
125+
}
126+
}
127+
69128
}

0 commit comments

Comments
 (0)