Skip to content

Add OptionalInt overload for NylonExecutor.builder().maxThreads#480

Closed
ash211 wants to merge 1 commit intodevelopfrom
aash/optionalInt
Closed

Add OptionalInt overload for NylonExecutor.builder().maxThreads#480
ash211 wants to merge 1 commit intodevelopfrom
aash/optionalInt

Conversation

@ash211
Copy link

@ash211 ash211 commented Sep 13, 2025

Before this PR

Users who may or may not have a maxThreads value to set have to do an unergonomic intermediate builder to conditionally call the maxThreads(int) method:

OptionalInt maxThreads = ...

MaxThreadsStage builder =
    NylonExecutor.builder().name(name.get()).executor(sharedExecutor.get());
maxThreads.ifPresent(builder::maxThreads); // <--- here
return builder.uncaughtExceptionHandler(executorFactory.uncaughtExceptionHandler(name))
    .build();

After this PR

==COMMIT_MSG==
Add OptionalInt overload for NylonExecutor.builder().maxThreads
==COMMIT_MSG==

With this change, they can now use a fluent builder all the way through, and express as:

OptionalInt maxThreads = ...

return NylonExecutor.builder()
    .name(name.get())
    .executor(sharedExecutor.get())
    .maxThreads(maxThreads) // <--- here
    .uncaughtExceptionHandler(executorFactory.uncaughtExceptionHandler(name))
    .build();

Possible downsides?

  • slightly larger API surface

@changelog-app
Copy link

changelog-app bot commented Sep 13, 2025

Generate changelog in changelog/@unreleased

Type (Select exactly one)

  • Feature (Adding new functionality)
  • Improvement (Improving existing functionality)
  • Fix (Fixing an issue with existing functionality)
  • Break (Creating a new major version by breaking public APIs)
  • Deprecation (Removing functionality in a non-breaking way)
  • Migration (Automatically moving data/functionality to a new system)

Description

Add OptionalInt overload for NylonExecutor.builder().maxThreads

Check the box to generate changelog(s)

  • Generate changelog entry

@changelog-app
Copy link

changelog-app bot commented Sep 13, 2025

Successfully generated changelog entry!

What happened?

Your changelog entries have been stored in the database as part of our migration to ChangelogV3.

Need to regenerate?

Simply interact with the changelog bot comment again to regenerate these entries.

@ash211 ash211 requested a review from schlosna September 13, 2025 06:56
@pkoenig10
Copy link
Member

I have concerns with how we are proposing to use this internally, and if we don't need this I'd prefer to increase the surface area of this API.

Comment on lines +114 to +124
return maxThreads(OptionalInt.of(value));
}

@Override
public QueueSizeStage maxThreads(OptionalInt value) {
Preconditions.checkState(maxThreads.isEmpty(), "maxThreads has already been configured");
if (value <= 0) {
throw new SafeIllegalArgumentException("maxThreads must be positive");
if (value.isPresent() && value.getAsInt() <= 0) {
throw new SafeIllegalArgumentException(
"maxThreads must be positive", SafeArg.of("maxThreads", value.getAsInt()));
}
maxThreads = OptionalInt.of(value);
maxThreads = value;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does either of these actually limit the maximum number of threads created for this executor to concurrently execute tasks?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sample failing test:

    @ParameterizedTest
    @ValueSource(ints = {1, 2, 3, 4, 5})
    void testMaxThreadsLimitsTotalThreads(int maxThreads) {
        AtomicInteger threadsCreated = new AtomicInteger();
        ExecutorService delegate = Executors.newCachedThreadPool(new ThreadFactoryBuilder()
                .setDaemon(true)
                .setNameFormat("test-%d")
                .setThreadFactory(r -> {
                    int id = threadsCreated.incrementAndGet();
                    Thread thread = Executors.defaultThreadFactory().newThread(r);
                    thread.setName("thread-" + id);
                    return thread;
                })
                .build());
        try {
            CountDownLatch countDownLatch = new CountDownLatch(maxThreads);
            int tasks = 3 * maxThreads;
            List<ListenableFuture<String>> futures = new ArrayList<>(tasks);
            List<Long> threadIds = Collections.synchronizedList(new ArrayList<>(maxThreads));
            ListeningExecutorService executor = MoreExecutors.listeningDecorator(NylonExecutor.builder()
                    .name("foo")
                    .executor(delegate)
                    .maxThreads(maxThreads)
                    .build());

            for (int i = 0; i < tasks; i++) {
                futures.add(executor.submit(() -> {
                    countDownLatch.countDown();
                    countDownLatch.await();
                    Thread thread = Thread.currentThread();
                    threadIds.add(thread.getId());
                    return thread.getName();
                }));
            }

            assertThat(Futures.successfulAsList(futures))
                    .succeedsWithin(Duration.ofSeconds(10))
                    .asInstanceOf(InstanceOfAssertFactories.list(String.class))
                    .hasSize(tasks)
                    .allSatisfy(value -> assertThat(value).startsWith("foo-"));

            assertThat(threadsCreated.get())
                    .as("should create at most %s threads", maxThreads)
                    .isLessThanOrEqualTo(maxThreads);
            assertThat(Set.copyOf(threadIds)).hasSizeLessThanOrEqualTo(maxThreads);

            assertThat(delegate)
                    .asInstanceOf(InstanceOfAssertFactories.type(ThreadPoolExecutor.class))
                    .satisfies(threadPoolExecutor -> {
                        assertThat(threadPoolExecutor.getCorePoolSize()).isLessThanOrEqualTo(maxThreads);
                        assertThat(threadPoolExecutor.getPoolSize()).isLessThanOrEqualTo(maxThreads);
                        assertThat(threadPoolExecutor.getMaximumPoolSize()).isLessThanOrEqualTo(maxThreads);
                        assertThat(threadPoolExecutor.getCompletedTaskCount()).isEqualTo(tasks);
                    });
        } finally {
            assertThat(MoreExecutors.shutdownAndAwaitTermination(delegate, Duration.ofSeconds(1)))
                    .as("Delegate failed to stop")
                    .isTrue();
        }
    }

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that test counts how many threads are created (identified by Thread.id()), but I think Nylon and the backing org.jboss.threads.EnhancedViewExecutor are capping how many are simultaneously running.

This test passes:

    @ParameterizedTest
    @ValueSource(ints = {1, 2, 3, 4, 5})
    void testMaxThreadsLimitsTotalThreads(int maxThreads) {
        AtomicInteger threadsCreated = new AtomicInteger();
        ExecutorService delegate = Executors.newCachedThreadPool(new ThreadFactoryBuilder()
                .setDaemon(true)
                .setNameFormat("test-%d")
                .setThreadFactory(r -> {
                    int id = threadsCreated.incrementAndGet();
                    Thread thread = Executors.defaultThreadFactory().newThread(r);
                    thread.setName("thread-" + id);
                    return thread;
                })
                .build());
        try {
            CountDownLatch countDownLatch = new CountDownLatch(maxThreads);
            int tasks = 3 * maxThreads;
            List<ListenableFuture<String>> futures = new ArrayList<>(tasks);
            List<Long> threadIds = Collections.synchronizedList(new ArrayList<>(maxThreads));
            List<Instant> threadStarts = Collections.synchronizedList(new ArrayList<>(maxThreads));
            List<Instant> threadEnds = Collections.synchronizedList(new ArrayList<>(maxThreads));
            ListeningExecutorService executor = MoreExecutors.listeningDecorator(NylonExecutor.builder()
                    .name("foo")
                    .executor(delegate)
                    .maxThreads(maxThreads)
                    .build());
            for (int i = 0; i < tasks; i++) {
                futures.add(executor.submit(() -> {
                    threadStarts.add(Instant.now());
                    countDownLatch.countDown();
                    countDownLatch.await();
                    Thread thread = Thread.currentThread();
                    threadIds.add(thread.getId());
                    threadEnds.add(Instant.now());
                    return thread.getName();
                }));
            }
            assertThat(Futures.successfulAsList(futures))
                    .succeedsWithin(Duration.ofSeconds(10))
                    .asInstanceOf(InstanceOfAssertFactories.list(String.class))
                    .hasSize(tasks)
                    .allSatisfy(value -> assertThat(value).startsWith("foo-"));
            ThreadPeak threadPeak = peakThreads(threadStarts, threadEnds);
            assertThat(threadPeak.peak)
                    .as(
                            "should have at most %s threads running at once. Starts: %s. Ends: %s. List: %s",
                            maxThreads, threadStarts, threadEnds, threadPeak.description)
                    .isLessThanOrEqualTo(maxThreads);
            // assertThat(threadsCreated.get())
            //   .as("should create at most %s threads. Thread ids: %s", maxThreads, threadIds)
            //   .isLessThanOrEqualTo(maxThreads);
            // assertThat(Set.copyOf(threadIds)).hasSizeLessThanOrEqualTo(maxThreads);
            assertThat(delegate)
                    .asInstanceOf(InstanceOfAssertFactories.type(ThreadPoolExecutor.class))
                    .satisfies(threadPoolExecutor -> {
                        // assertThat(threadPoolExecutor.getCorePoolSize()).isLessThanOrEqualTo(maxThreads);
                        // assertThat(threadPoolExecutor.getPoolSize()).isLessThanOrEqualTo(maxThreads);
                        // assertThat(threadPoolExecutor.getMaximumPoolSize()).isLessThanOrEqualTo(maxThreads);
                        assertThat(threadPoolExecutor.getCompletedTaskCount()).isEqualTo(tasks);
                    });
        } finally {
            assertThat(MoreExecutors.shutdownAndAwaitTermination(delegate, Duration.ofSeconds(1)))
                    .as("Delegate failed to stop")
                    .isTrue();
        }
    }

    record ThreadPeak(int peak, String description) {}

    /**
     * The maximum number of threads that were running at once. A thread is counted as running if it has a start
     * instant but not yet an end instant.
     */
    private static ThreadPeak peakThreads(List<Instant> starts, List<Instant> ends) {
        // direction is +1 for start or -1 for ends
        record Tick(Instant instant, int direction) {}

        List<Tick> ticks = Streams.concat(
                        starts.stream().map(instant -> new Tick(instant, 1)),
                        ends.stream().map(instant -> new Tick(instant, -1)))
                .sorted(Comparator.comparing(Tick::instant))
                .toList();

        int running = 0;
        int max = 0;
        for (Tick tick : ticks) {
            running += tick.direction;
            max = Math.max(max, running);
        }
        return new ThreadPeak(max, ticks.toString());
    }

So for trying to cap memory usage from created thread stacks, maxThreads is not helpful

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ash211
Copy link
Author

ash211 commented Sep 16, 2025

No longer planning to use this, so don't need to expand the NylonExecutor API

@ash211 ash211 closed this Sep 16, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants