Skip to content

Commit 34c7ba1

Browse files
committed
Favour FutureTask for concurrency support
TestNG currently has a custom implementation of ThreadPoolExecutor named GraphThreadPoolExecutor. This class was created to facilitate concurrency in a DAG. Now that we are on JDK11, we can very well move over to leveraging FutureTask based implementations and thus decouple ourselves from the Executor and just focus on orchestrating the next node retrieval for execution. Since this is experimental, we are currently providing a JVM based switch that can fall back to the old Behaviour in case of any issues. JVM argument to use “-Dtestng.favor.custom.thread-pool.executor=true”
1 parent 3cb01b4 commit 34c7ba1

12 files changed

+464
-98
lines changed

testng-core-api/src/main/java/org/testng/internal/RuntimeBehavior.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,24 @@ public final class RuntimeBehavior {
2020
public static final String IGNORE_CALLBACK_INVOCATION_SKIPS = "testng.ignore.callback.skip";
2121
public static final String SYMMETRIC_LISTENER_EXECUTION = "testng.listener.execution.symmetric";
2222
public static final String PREFERENTIAL_LISTENERS = "testng.preferential.listeners.package";
23+
public static final String FAVOR_CUSTOM_THREAD_POOL_EXECUTOR =
24+
"testng.favor.custom.thread-pool.executor";
2325

2426
private RuntimeBehavior() {}
2527

2628
public static boolean ignoreCallbackInvocationSkips() {
2729
return Boolean.getBoolean(IGNORE_CALLBACK_INVOCATION_SKIPS);
2830
}
2931

32+
/**
33+
* @return - <code>true</code> if TestNG is to be using its custom implementation of {@link
34+
* java.util.concurrent.ThreadPoolExecutor} for running concurrent tests. Defaults to <code>
35+
* false</code>
36+
*/
37+
public static boolean favourCustomThreadPoolExecutor() {
38+
return Boolean.getBoolean(FAVOR_CUSTOM_THREAD_POOL_EXECUTOR);
39+
}
40+
3041
/**
3142
* @return - A comma separated list of packages that represent special listeners which users will
3243
* expect to be executed after executing the regular listeners. Here special listeners can be
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
package org.testng;
2+
3+
import java.util.concurrent.BlockingQueue;
4+
import java.util.concurrent.ExecutorService;
5+
import java.util.concurrent.ThreadPoolExecutor;
6+
import java.util.concurrent.TimeUnit;
7+
import org.testng.internal.IConfiguration;
8+
import org.testng.internal.RuntimeBehavior;
9+
import org.testng.internal.Utils;
10+
import org.testng.internal.thread.TestNGThreadFactory;
11+
import org.testng.internal.thread.graph.GraphOrchestrator;
12+
import org.testng.log4testng.Logger;
13+
import org.testng.thread.IExecutorFactory;
14+
import org.testng.thread.ITestNGThreadPoolExecutor;
15+
import org.testng.thread.IThreadWorkerFactory;
16+
17+
class SuiteTaskExecutor {
18+
private final BlockingQueue<Runnable> queue;
19+
private final IDynamicGraph<ISuite> graph;
20+
private final IThreadWorkerFactory<ISuite> factory;
21+
private final IConfiguration configuration;
22+
23+
private final int threadPoolSize;
24+
25+
private ExecutorService service;
26+
27+
private static final Logger LOGGER = Logger.getLogger(SuiteTaskExecutor.class);
28+
29+
public SuiteTaskExecutor(
30+
IConfiguration configuration,
31+
IThreadWorkerFactory<ISuite> factory,
32+
BlockingQueue<Runnable> queue,
33+
IDynamicGraph<ISuite> graph,
34+
int threadPoolSize) {
35+
this.configuration = configuration;
36+
this.factory = factory;
37+
this.queue = queue;
38+
this.graph = graph;
39+
this.threadPoolSize = threadPoolSize;
40+
}
41+
42+
public void execute() {
43+
String name = "suites-";
44+
if (RuntimeBehavior.favourCustomThreadPoolExecutor()) {
45+
IExecutorFactory execFactory = configuration.getExecutorFactory();
46+
ITestNGThreadPoolExecutor executor =
47+
execFactory.newSuiteExecutor(
48+
name,
49+
graph,
50+
factory,
51+
threadPoolSize,
52+
threadPoolSize,
53+
Integer.MAX_VALUE,
54+
TimeUnit.MILLISECONDS,
55+
queue,
56+
null);
57+
executor.run();
58+
service = executor;
59+
} else {
60+
service =
61+
new ThreadPoolExecutor(
62+
threadPoolSize,
63+
threadPoolSize,
64+
Integer.MAX_VALUE,
65+
TimeUnit.MILLISECONDS,
66+
queue,
67+
new TestNGThreadFactory(name));
68+
GraphOrchestrator<ISuite> executor = new GraphOrchestrator<>(service, factory, graph, null);
69+
executor.run();
70+
}
71+
}
72+
73+
public void awaitCompletion() {
74+
Utils.log("TestNG", 2, "Starting executor for all suites");
75+
try {
76+
boolean ignored = service.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
77+
service.shutdownNow();
78+
} catch (InterruptedException handled) {
79+
Thread.currentThread().interrupt();
80+
LOGGER.error(handled.getMessage(), handled);
81+
}
82+
}
83+
}

testng-core/src/main/java/org/testng/TestNG.java

Lines changed: 15 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import java.util.ServiceLoader;
2020
import java.util.Set;
2121
import java.util.concurrent.LinkedBlockingQueue;
22-
import java.util.concurrent.TimeUnit;
2322
import org.testng.SuiteRunner.TestListenersContainer;
2423
import org.testng.annotations.ITestAnnotation;
2524
import org.testng.collections.Lists;
@@ -60,7 +59,6 @@
6059
import org.testng.reporters.XMLReporter;
6160
import org.testng.reporters.jq.Main;
6261
import org.testng.thread.IExecutorFactory;
63-
import org.testng.thread.ITestNGThreadPoolExecutor;
6462
import org.testng.thread.IThreadWorkerFactory;
6563
import org.testng.util.Strings;
6664
import org.testng.xml.IPostProcessor;
@@ -835,6 +833,8 @@ public void setVerbose(int verbose) {
835833
m_verbose = verbose;
836834
}
837835

836+
/** This method stands deprecated as of TestNG <code>v7.9.0</code>. */
837+
@Deprecated
838838
public void setExecutorFactoryClass(String clazzName) {
839839
this.m_executorFactory = createExecutorFactoryInstanceUsing(clazzName);
840840
}
@@ -853,10 +853,14 @@ private IExecutorFactory createExecutorFactoryInstanceUsing(String clazzName) {
853853
clazzName + " does not implement " + IExecutorFactory.class.getName());
854854
}
855855

856+
/** This method stands deprecated as of TestNG <code>v7.9.0</code>. */
857+
@Deprecated
856858
public void setExecutorFactory(IExecutorFactory factory) {
857859
this.m_executorFactory = factory;
858860
}
859861

862+
/** This method stands deprecated as of TestNG <code>v7.9.0</code>. */
863+
@Deprecated
860864
public IExecutorFactory getExecutorFactory() {
861865
if (this.m_executorFactory == null) {
862866
this.m_executorFactory = createExecutorFactoryInstanceUsing(DEFAULT_THREADPOOL_FACTORY);
@@ -1227,29 +1231,15 @@ public List<ISuite> runSuitesLocally() {
12271231
IThreadWorkerFactory<ISuite> factory =
12281232
new SuiteWorkerFactory(
12291233
suiteRunnerMap, 0 /* verbose hasn't been set yet */, getDefaultSuiteName());
1230-
ITestNGThreadPoolExecutor pooledExecutor =
1231-
this.getExecutorFactory()
1232-
.newSuiteExecutor(
1233-
"suites",
1234-
suiteGraph,
1235-
factory,
1236-
m_suiteThreadPoolSize,
1237-
m_suiteThreadPoolSize,
1238-
Integer.MAX_VALUE,
1239-
TimeUnit.MILLISECONDS,
1240-
new LinkedBlockingQueue<>(),
1241-
null);
1242-
1243-
Utils.log("TestNG", 2, "Starting executor for all suites");
1244-
// Run all suites in parallel
1245-
pooledExecutor.run();
1246-
try {
1247-
pooledExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
1248-
pooledExecutor.shutdownNow();
1249-
} catch (InterruptedException handled) {
1250-
Thread.currentThread().interrupt();
1251-
error("Error waiting for concurrent executors to finish " + handled.getMessage());
1252-
}
1234+
SuiteTaskExecutor taskExecutor =
1235+
new SuiteTaskExecutor(
1236+
this.m_configuration,
1237+
factory,
1238+
new LinkedBlockingQueue<>(),
1239+
suiteGraph,
1240+
m_suiteThreadPoolSize);
1241+
taskExecutor.execute();
1242+
taskExecutor.awaitCompletion();
12531243

12541244
//
12551245
// Generate the suites report

testng-core/src/main/java/org/testng/TestRunner.java

Lines changed: 5 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
import java.util.concurrent.BlockingQueue;
1616
import java.util.concurrent.LinkedBlockingQueue;
1717
import java.util.concurrent.PriorityBlockingQueue;
18-
import java.util.concurrent.TimeUnit;
1918
import java.util.concurrent.atomic.AtomicBoolean;
2019
import java.util.concurrent.atomic.AtomicReference;
2120
import java.util.stream.Collectors;
@@ -61,7 +60,6 @@
6160
import org.testng.internal.objects.IObjectDispenser;
6261
import org.testng.junit.IJUnitTestRunner;
6362
import org.testng.log4testng.Logger;
64-
import org.testng.thread.ITestNGThreadPoolExecutor;
6563
import org.testng.thread.IThreadWorkerFactory;
6664
import org.testng.thread.IWorker;
6765
import org.testng.util.Strings;
@@ -761,8 +759,6 @@ private static BlockingQueue<Runnable> newQueue(boolean needPrioritySort) {
761759
private void privateRun(XmlTest xmlTest) {
762760
boolean parallel = xmlTest.getParallel().isParallel();
763761

764-
// parallel
765-
int threadCount = parallel ? xmlTest.getThreadCount() : 1;
766762
// Make sure we create a graph based on the intercepted methods, otherwise an interceptor
767763
// removing methods would cause the graph never to terminate (because it would expect
768764
// termination from methods that never get invoked).
@@ -798,36 +794,11 @@ private void privateRun(XmlTest xmlTest) {
798794
if (graph.getNodeCount() <= 0) {
799795
return;
800796
}
801-
ITestNGThreadPoolExecutor executor =
802-
this.m_configuration
803-
.getExecutorFactory()
804-
.newTestMethodExecutor(
805-
"test=" + xmlTest.getName(),
806-
graph,
807-
this,
808-
threadCount,
809-
threadCount,
810-
0,
811-
TimeUnit.MILLISECONDS,
812-
newQueue(needPrioritySort),
813-
methodComparator);
814-
executor.run();
815-
try {
816-
long timeOut = m_xmlTest.getTimeOut(XmlTest.DEFAULT_TIMEOUT_MS);
817-
Utils.log(
818-
"TestRunner",
819-
2,
820-
"Starting executor for test "
821-
+ m_xmlTest.getName()
822-
+ " with time out:"
823-
+ timeOut
824-
+ " milliseconds.");
825-
executor.awaitTermination(timeOut, TimeUnit.MILLISECONDS);
826-
executor.shutdownNow();
827-
} catch (InterruptedException handled) {
828-
LOGGER.error(handled.getMessage(), handled);
829-
Thread.currentThread().interrupt();
830-
}
797+
TestTaskExecutor taskExecutor =
798+
new TestTaskExecutor(
799+
m_configuration, xmlTest, this, newQueue(needPrioritySort), graph, methodComparator);
800+
taskExecutor.execute();
801+
taskExecutor.awaitCompletion();
831802
return;
832803
}
833804
List<ITestNGMethod> freeNodes = graph.getFreeNodes();
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
package org.testng;
2+
3+
import java.util.Comparator;
4+
import java.util.concurrent.BlockingQueue;
5+
import java.util.concurrent.ExecutorService;
6+
import java.util.concurrent.ThreadPoolExecutor;
7+
import java.util.concurrent.TimeUnit;
8+
import org.testng.internal.IConfiguration;
9+
import org.testng.internal.RuntimeBehavior;
10+
import org.testng.internal.Utils;
11+
import org.testng.internal.thread.TestNGThreadFactory;
12+
import org.testng.internal.thread.graph.GraphOrchestrator;
13+
import org.testng.log4testng.Logger;
14+
import org.testng.thread.IExecutorFactory;
15+
import org.testng.thread.ITestNGThreadPoolExecutor;
16+
import org.testng.thread.IThreadWorkerFactory;
17+
import org.testng.xml.XmlTest;
18+
19+
class TestTaskExecutor {
20+
private final BlockingQueue<Runnable> queue;
21+
private final Comparator<ITestNGMethod> comparator;
22+
private final IDynamicGraph<ITestNGMethod> graph;
23+
private final XmlTest xmlTest;
24+
private final IThreadWorkerFactory<ITestNGMethod> factory;
25+
private final IConfiguration configuration;
26+
private final long timeOut;
27+
28+
private ExecutorService service;
29+
30+
private static final Logger LOGGER = Logger.getLogger(TestTaskExecutor.class);
31+
32+
public TestTaskExecutor(
33+
IConfiguration configuration,
34+
XmlTest xmlTest,
35+
IThreadWorkerFactory<ITestNGMethod> factory,
36+
BlockingQueue<Runnable> queue,
37+
IDynamicGraph<ITestNGMethod> graph,
38+
Comparator<ITestNGMethod> comparator) {
39+
this.configuration = configuration;
40+
this.xmlTest = xmlTest;
41+
this.factory = factory;
42+
this.queue = queue;
43+
this.graph = graph;
44+
this.comparator = comparator;
45+
this.timeOut = xmlTest.getTimeOut(XmlTest.DEFAULT_TIMEOUT_MS);
46+
}
47+
48+
public void execute() {
49+
String name = "test-" + xmlTest.getName();
50+
int threadCount = xmlTest.getThreadCount();
51+
threadCount = Math.max(threadCount, 1);
52+
if (RuntimeBehavior.favourCustomThreadPoolExecutor()) {
53+
IExecutorFactory execFactory = configuration.getExecutorFactory();
54+
ITestNGThreadPoolExecutor executor =
55+
execFactory.newTestMethodExecutor(
56+
name,
57+
graph,
58+
factory,
59+
threadCount,
60+
threadCount,
61+
0,
62+
TimeUnit.MILLISECONDS,
63+
queue,
64+
comparator);
65+
executor.run();
66+
service = executor;
67+
} else {
68+
service =
69+
new ThreadPoolExecutor(
70+
threadCount,
71+
threadCount,
72+
0,
73+
TimeUnit.MILLISECONDS,
74+
queue,
75+
new TestNGThreadFactory(name));
76+
GraphOrchestrator<ITestNGMethod> executor =
77+
new GraphOrchestrator<>(service, factory, graph, comparator);
78+
executor.run();
79+
}
80+
}
81+
82+
public void awaitCompletion() {
83+
String msg =
84+
String.format(
85+
"Starting executor test %d with time out: %d milliseconds.", timeOut, timeOut);
86+
Utils.log("TestTaskExecutor", 2, msg);
87+
try {
88+
boolean ignored = service.awaitTermination(timeOut, TimeUnit.MILLISECONDS);
89+
service.shutdownNow();
90+
} catch (InterruptedException handled) {
91+
LOGGER.error(handled.getMessage(), handled);
92+
Thread.currentThread().interrupt();
93+
}
94+
}
95+
}

testng-core/src/main/java/org/testng/internal/thread/DefaultThreadPoolExecutorFactory.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,11 @@
1111
import org.testng.thread.ITestNGThreadPoolExecutor;
1212
import org.testng.thread.IThreadWorkerFactory;
1313

14+
/**
15+
* @deprecated - This implementation stands deprecated as of TestNG <code>v7.9.0</code>. There are
16+
* no alternatives for this implementation.
17+
*/
18+
@Deprecated
1419
public class DefaultThreadPoolExecutorFactory implements IExecutorFactory {
1520

1621
@Override

0 commit comments

Comments
 (0)