Skip to content

Commit

Permalink
see reactor#1804 Enforce 100K task cap on Schedulers.boundedElastic()
Browse files Browse the repository at this point in the history
While introducing the singleton version of BoundedElasticScheduler,
we might as well make it truly bounded, including in the number of task
submissions it can "absorb" during a spike. Used to be unbounded, now
bounded to 100K tasks.

This is also more likely to uncover problematic scheduling of blocking
tasks. Users switching from `elastic()` to `boundedElastic()` and
seeing `RejectedExecutionException`s should challenge why their Reactor
app schedules so many blocking tasks, and if still legitimate and
necessary, use a `newBoundedElastic` instance that is better tuned to
their workload.

Reviewed-in: reactor#1900
  • Loading branch information
simonbasle authored Sep 20, 2019
1 parent f705674 commit 32d2a29
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 12 deletions.
2 changes: 1 addition & 1 deletion docs/asciidoc/coreFeatures.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ problems and lead to too many threads (see below).
* A bounded elastic thread pool (`Schedulers.boundedElastic()`). Like its predecessor `elastic()`, it
creates new worker pools as needed and reuses idle ones. Worker pools that stay idle for too long (the default is 60s) are
also disposed. Unlike its `elastic()` predecessor, it has a cap on the number of backing threads it can create (default is number of CPU cores x 10).
Tasks submitted after the cap has been reached are enqueued and will be re-scheduled when a thread becomes available
Up to 100 000 tasks submitted after the cap has been reached are enqueued and will be re-scheduled when a thread becomes available
(when scheduling with a delay, the delay starts when the thread becomes available). This is a better choice for I/O blocking work.
`Schedulers.boundedElastic()` is a handy way to give a blocking process its own thread so that
it does not tie up other resources. See <<faq.wrap-blocking>>, but doesn't pressure the system too much with new threads.
Expand Down
3 changes: 2 additions & 1 deletion docs/asciidoc/faq.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ from `Schedulers.boundedElastic()`.
You should use a `Mono`, because the source returns one value. You should use
`Schedulers.boundedElastic`, because it creates a dedicated thread to wait for the
blocking resource without impacting other non-blocking processing, while also ensuring
that there is a limit to the amount of threads that can be created.
that there is a limit to the amount of threads that can be created, and blocking tasks
that can be enqueued and deferred during a spike.

Note that `subscribeOn` does not subscribe to the `Mono`. It specifies what
kind of `Scheduler` to use when a subscribe call happens.
Expand Down
21 changes: 13 additions & 8 deletions reactor-core/src/main/java/reactor/core/scheduler/Schedulers.java
Original file line number Diff line number Diff line change
Expand Up @@ -93,14 +93,14 @@ public abstract class Schedulers {
/**
* Default maximum number of enqueued tasks for the global {@link #boundedElastic()} {@link Scheduler}, initialized
* by system property {@code reactor.schedulers.defaultBoundedElasticQueueSize} and falls back to
* unbounded by default ({@link Integer#MAX_VALUE}).
* a bound of 100 000 tasks.
*
* @see #boundedElastic()
*/
public static final int DEFAULT_BOUNDED_ELASTIC_QUEUESIZE =
Optional.ofNullable(System.getProperty("reactor.schedulers.defaultBoundedElasticQueueSize"))
.map(Integer::parseInt)
.orElseGet(() -> Integer.MAX_VALUE);
.orElse(100000);

static volatile BiConsumer<Thread, ? super Throwable> onHandleErrorHook;

Expand Down Expand Up @@ -193,17 +193,22 @@ public static Scheduler elastic() {
* Workers, reusing them once the Workers have been shut down. The underlying daemon
* threads can be evicted if idle for more than {@link BoundedElasticScheduler#DEFAULT_TTL_SECONDS 60} seconds.
* <p>
* The maximum number of created thread pools is bounded by a {@code cap} (by default,
* The maximum number of created thread pools is bounded by a {@code cap} (by default
* ten times the number of available CPU cores, see {@link #DEFAULT_BOUNDED_ELASTIC_SIZE}).
* If a worker is requested while the cap is reached, a facade {@link reactor.core.scheduler.Scheduler.Worker}
* The maximum number of task submissions that can be enqueued and deferred after this thread cap
* has been reached is bounded (by default 100K additional tasks, see {@link #DEFAULT_BOUNDED_ELASTIC_QUEUESIZE}).
* <p>
* If a worker is requested while the thread cap is reached, a facade {@link reactor.core.scheduler.Scheduler.Worker}
* is provided which will enqueue the tasks submitted to it, deferring the actual submission
* of tasks until a thread-backed worker becomes available. This can thus affect initial delays of tasks.
* If a task is directly submitted to the {@link Scheduler} while the cap has been reached,
* it will be similarly enqueue and deferred (unless property {@link #DEFAULT_BOUNDED_ELASTIC_QUEUESIZE} is
* tuned on startup).
* If a task is directly submitted to the {@link Scheduler} while the thread cap has been reached,
* it will be similarly enqueue and deferred.
* In both cases, once the task cap has also been reached, further submissions are rejected with a
* {@link RejectedExecutionException}.
*
* @return a new {@link Scheduler} that dynamically create workers with an upper bound to
* the number of backing threads, reuses threads and evict idle ones
* the number of backing threads and after that on the number of enqueued tasks,
* that reuses threads and evict idle ones
*/
public static Scheduler boundedElastic() {
return cache(CACHED_BOUNDED_ELASTIC, BOUNDED_ELASTIC, BOUNDED_ELASTIC_SUPPLIER);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -595,6 +595,67 @@ public void deferredWorkerRejectsTasksAfterBeingDisposed() {
.withMessage("Worker has been disposed");
}


@Test
public void mixOfDirectAndWorkerTasksWithRejectionAfter100kLimit() {
AtomicInteger taskDone = new AtomicInteger();
AtomicInteger taskRejected = new AtomicInteger();

int limit = 100_000;
int workerCount = 70_000;

Scheduler scheduler = afterTest.autoDispose(Schedulers.newBoundedElastic(
1, limit,
"tasksRejectionAfter100kLimit"
));
Scheduler.Worker activeWorker = afterTest.autoDispose(scheduler.createWorker());
Scheduler.Worker fakeWorker = afterTest.autoDispose(scheduler.createWorker());

for (int i = 0; i < limit + 10; i++) {
try {
if (i < workerCount) {
//larger subset of tasks are submitted to the worker
fakeWorker.schedule(taskDone::incrementAndGet);
}
else if (i < limit) {
//smaller subset of tasks are submitted directly to the scheduler
scheduler.schedule(taskDone::incrementAndGet);
}
else if (i % 2 == 0) {
//half of over-limit tasks are submitted directly to the scheduler, half to worker
scheduler.schedule(taskDone::incrementAndGet);
}
else {
//half of over-limit tasks are submitted directly to the scheduler, half to worker
fakeWorker.schedule(taskDone::incrementAndGet);
}
}
catch (RejectedExecutionException ree) {
taskRejected.incrementAndGet();
}
}

assertThat(taskDone).as("taskDone before releasing activeWorker").hasValue(0);
assertThat(taskRejected).as("task rejected").hasValue(10);

activeWorker.dispose();

Awaitility.await().atMost(500, TimeUnit.MILLISECONDS)
.untilAsserted(() ->
assertThat(taskDone).as("all fakeWorker tasks done")
.hasValue(workerCount)
);

fakeWorker.dispose();

//TODO maybe investigate: large amount of direct deferred tasks takes more time to be executed
Awaitility.await().atMost(10, TimeUnit.SECONDS)
.untilAsserted(() ->
assertThat(taskDone).as("all deferred tasks done")
.hasValue(limit)
);
}

@Test
public void defaultBoundedElasticConfigurationIsConsistentWithJavadoc() {
Schedulers.CachedScheduler cachedBoundedElastic = (Schedulers.CachedScheduler) Schedulers.boundedElastic();
Expand All @@ -612,10 +673,10 @@ public void defaultBoundedElasticConfigurationIsConsistentWithJavadoc() {
.isEqualTo(BoundedElasticScheduler.DEFAULT_TTL_SECONDS)
.isEqualTo(60);

//unbounded task queueing
//100K bounded task queueing
assertThat(boundedElastic.deferredTaskCap)
.as("default unbounded task queueing")
.isEqualTo(Integer.MAX_VALUE)
.isEqualTo(100_000)
.isEqualTo(Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE);
}

Expand Down

0 comments on commit 32d2a29

Please sign in to comment.