Add OptionalInt overload for NylonExecutor.builder().maxThreads#480
Add OptionalInt overload for NylonExecutor.builder().maxThreads#480
Conversation
Generate changelog in
|
✅ Successfully generated changelog entry!What happened?Your changelog entries have been stored in the database as part of our migration to ChangelogV3. Need to regenerate?Simply interact with the changelog bot comment again to regenerate these entries. |
|
I have concerns with how we are proposing to use this internally, and if we don't need this I'd prefer to increase the surface area of this API. |
| return maxThreads(OptionalInt.of(value)); | ||
| } | ||
|
|
||
| @Override | ||
| public QueueSizeStage maxThreads(OptionalInt value) { | ||
| Preconditions.checkState(maxThreads.isEmpty(), "maxThreads has already been configured"); | ||
| if (value <= 0) { | ||
| throw new SafeIllegalArgumentException("maxThreads must be positive"); | ||
| if (value.isPresent() && value.getAsInt() <= 0) { | ||
| throw new SafeIllegalArgumentException( | ||
| "maxThreads must be positive", SafeArg.of("maxThreads", value.getAsInt())); | ||
| } | ||
| maxThreads = OptionalInt.of(value); | ||
| maxThreads = value; |
There was a problem hiding this comment.
does either of these actually limit the maximum number of threads created for this executor to concurrently execute tasks?
There was a problem hiding this comment.
sample failing test:
@ParameterizedTest
@ValueSource(ints = {1, 2, 3, 4, 5})
void testMaxThreadsLimitsTotalThreads(int maxThreads) {
AtomicInteger threadsCreated = new AtomicInteger();
ExecutorService delegate = Executors.newCachedThreadPool(new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("test-%d")
.setThreadFactory(r -> {
int id = threadsCreated.incrementAndGet();
Thread thread = Executors.defaultThreadFactory().newThread(r);
thread.setName("thread-" + id);
return thread;
})
.build());
try {
CountDownLatch countDownLatch = new CountDownLatch(maxThreads);
int tasks = 3 * maxThreads;
List<ListenableFuture<String>> futures = new ArrayList<>(tasks);
List<Long> threadIds = Collections.synchronizedList(new ArrayList<>(maxThreads));
ListeningExecutorService executor = MoreExecutors.listeningDecorator(NylonExecutor.builder()
.name("foo")
.executor(delegate)
.maxThreads(maxThreads)
.build());
for (int i = 0; i < tasks; i++) {
futures.add(executor.submit(() -> {
countDownLatch.countDown();
countDownLatch.await();
Thread thread = Thread.currentThread();
threadIds.add(thread.getId());
return thread.getName();
}));
}
assertThat(Futures.successfulAsList(futures))
.succeedsWithin(Duration.ofSeconds(10))
.asInstanceOf(InstanceOfAssertFactories.list(String.class))
.hasSize(tasks)
.allSatisfy(value -> assertThat(value).startsWith("foo-"));
assertThat(threadsCreated.get())
.as("should create at most %s threads", maxThreads)
.isLessThanOrEqualTo(maxThreads);
assertThat(Set.copyOf(threadIds)).hasSizeLessThanOrEqualTo(maxThreads);
assertThat(delegate)
.asInstanceOf(InstanceOfAssertFactories.type(ThreadPoolExecutor.class))
.satisfies(threadPoolExecutor -> {
assertThat(threadPoolExecutor.getCorePoolSize()).isLessThanOrEqualTo(maxThreads);
assertThat(threadPoolExecutor.getPoolSize()).isLessThanOrEqualTo(maxThreads);
assertThat(threadPoolExecutor.getMaximumPoolSize()).isLessThanOrEqualTo(maxThreads);
assertThat(threadPoolExecutor.getCompletedTaskCount()).isEqualTo(tasks);
});
} finally {
assertThat(MoreExecutors.shutdownAndAwaitTermination(delegate, Duration.ofSeconds(1)))
.as("Delegate failed to stop")
.isTrue();
}
}
There was a problem hiding this comment.
that test counts how many threads are created (identified by Thread.id()), but I think Nylon and the backing org.jboss.threads.EnhancedViewExecutor are capping how many are simultaneously running.
This test passes:
@ParameterizedTest
@ValueSource(ints = {1, 2, 3, 4, 5})
void testMaxThreadsLimitsTotalThreads(int maxThreads) {
AtomicInteger threadsCreated = new AtomicInteger();
ExecutorService delegate = Executors.newCachedThreadPool(new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("test-%d")
.setThreadFactory(r -> {
int id = threadsCreated.incrementAndGet();
Thread thread = Executors.defaultThreadFactory().newThread(r);
thread.setName("thread-" + id);
return thread;
})
.build());
try {
CountDownLatch countDownLatch = new CountDownLatch(maxThreads);
int tasks = 3 * maxThreads;
List<ListenableFuture<String>> futures = new ArrayList<>(tasks);
List<Long> threadIds = Collections.synchronizedList(new ArrayList<>(maxThreads));
List<Instant> threadStarts = Collections.synchronizedList(new ArrayList<>(maxThreads));
List<Instant> threadEnds = Collections.synchronizedList(new ArrayList<>(maxThreads));
ListeningExecutorService executor = MoreExecutors.listeningDecorator(NylonExecutor.builder()
.name("foo")
.executor(delegate)
.maxThreads(maxThreads)
.build());
for (int i = 0; i < tasks; i++) {
futures.add(executor.submit(() -> {
threadStarts.add(Instant.now());
countDownLatch.countDown();
countDownLatch.await();
Thread thread = Thread.currentThread();
threadIds.add(thread.getId());
threadEnds.add(Instant.now());
return thread.getName();
}));
}
assertThat(Futures.successfulAsList(futures))
.succeedsWithin(Duration.ofSeconds(10))
.asInstanceOf(InstanceOfAssertFactories.list(String.class))
.hasSize(tasks)
.allSatisfy(value -> assertThat(value).startsWith("foo-"));
ThreadPeak threadPeak = peakThreads(threadStarts, threadEnds);
assertThat(threadPeak.peak)
.as(
"should have at most %s threads running at once. Starts: %s. Ends: %s. List: %s",
maxThreads, threadStarts, threadEnds, threadPeak.description)
.isLessThanOrEqualTo(maxThreads);
// assertThat(threadsCreated.get())
// .as("should create at most %s threads. Thread ids: %s", maxThreads, threadIds)
// .isLessThanOrEqualTo(maxThreads);
// assertThat(Set.copyOf(threadIds)).hasSizeLessThanOrEqualTo(maxThreads);
assertThat(delegate)
.asInstanceOf(InstanceOfAssertFactories.type(ThreadPoolExecutor.class))
.satisfies(threadPoolExecutor -> {
// assertThat(threadPoolExecutor.getCorePoolSize()).isLessThanOrEqualTo(maxThreads);
// assertThat(threadPoolExecutor.getPoolSize()).isLessThanOrEqualTo(maxThreads);
// assertThat(threadPoolExecutor.getMaximumPoolSize()).isLessThanOrEqualTo(maxThreads);
assertThat(threadPoolExecutor.getCompletedTaskCount()).isEqualTo(tasks);
});
} finally {
assertThat(MoreExecutors.shutdownAndAwaitTermination(delegate, Duration.ofSeconds(1)))
.as("Delegate failed to stop")
.isTrue();
}
}
record ThreadPeak(int peak, String description) {}
/**
* The maximum number of threads that were running at once. A thread is counted as running if it has a start
* instant but not yet an end instant.
*/
private static ThreadPeak peakThreads(List<Instant> starts, List<Instant> ends) {
// direction is +1 for start or -1 for ends
record Tick(Instant instant, int direction) {}
List<Tick> ticks = Streams.concat(
starts.stream().map(instant -> new Tick(instant, 1)),
ends.stream().map(instant -> new Tick(instant, -1)))
.sorted(Comparator.comparing(Tick::instant))
.toList();
int running = 0;
int max = 0;
for (Tick tick : ticks) {
running += tick.direction;
max = Math.max(max, running);
}
return new ThreadPeak(max, ticks.toString());
}
So for trying to cap memory usage from created thread stacks, maxThreads is not helpful
|
No longer planning to use this, so don't need to expand the NylonExecutor API |
Before this PR
Users who may or may not have a maxThreads value to set have to do an unergonomic intermediate builder to conditionally call the
maxThreads(int)method:After this PR
==COMMIT_MSG==
Add OptionalInt overload for NylonExecutor.builder().maxThreads
==COMMIT_MSG==
With this change, they can now use a fluent builder all the way through, and express as:
Possible downsides?