Skip to content

Commit

Permalink
fix reactor#1243 Make default scheduler pool size configurable
Browse files Browse the repository at this point in the history
Allows the scheduler default poolsize constant to be configurable via a
system property. This constant is used to size default and no-argument
parallel Schedulers.

Falls back to ` Runtime.getRuntime().availableProcessors()`, removing
the old minimum of 4.
  • Loading branch information
smiklos authored and simonbasle committed Jun 15, 2018
1 parent a879741 commit bd21f68
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 12 deletions.
19 changes: 10 additions & 9 deletions reactor-core/src/main/java/reactor/core/scheduler/Schedulers.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package reactor.core.scheduler;

import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -57,12 +58,15 @@
public abstract class Schedulers {

/**
* Default pool size, initialized to the number of processors available to the runtime
* on init (but with a minimum value of 4).
* Default pool size, initialized by system property `reactor.schedulers.defaultPoolSize`
* and falls back to the number of processors available to the runtime on init.
*
* @see Runtime#availableProcessors()
*/
public static final int DEFAULT_POOL_SIZE = Math.max(Runtime.getRuntime().availableProcessors(), 4);
public static final int DEFAULT_POOL_SIZE =
Optional.ofNullable(System.getProperty("reactor.schedulers.defaultPoolSize"))
.map(Integer::parseInt)
.orElseGet(() -> Runtime.getRuntime().availableProcessors());

static volatile BiConsumer<Thread, ? super Throwable> onHandleErrorHook;

Expand Down Expand Up @@ -235,8 +239,7 @@ public static Scheduler newElastic(int ttlSeconds, ThreadFactory threadFactory)
* ExecutorService-based workers and is suited for parallel work
*/
public static Scheduler newParallel(String name) {
return newParallel(name, Runtime.getRuntime()
.availableProcessors());
return newParallel(name, DEFAULT_POOL_SIZE);
}

/**
Expand Down Expand Up @@ -527,10 +530,8 @@ default Scheduler newSingle(ThreadFactory threadFactory) {
static final Supplier<Scheduler> ELASTIC_SUPPLIER =
() -> newElastic(ELASTIC, ElasticScheduler.DEFAULT_TTL_SECONDS, true);

static final Supplier<Scheduler> PARALLEL_SUPPLIER = () -> newParallel(PARALLEL,
Runtime.getRuntime()
.availableProcessors(),
true);
static final Supplier<Scheduler> PARALLEL_SUPPLIER =
() -> newParallel(PARALLEL, DEFAULT_POOL_SIZE, true);

static final Supplier<Scheduler> SINGLE_SUPPLIER = () -> newSingle(SINGLE, true);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.LockSupport;
import java.util.stream.Stream;

import org.junit.Assert;
import org.junit.Ignore;
Expand Down Expand Up @@ -105,7 +104,8 @@ public void onComplete() {
latch.await(8, TimeUnit.SECONDS);

long count = latch.getCount();
org.junit.Assert.assertTrue("Count > 0 : " + count + " (" + list + ") , Running on " + Schedulers.DEFAULT_POOL_SIZE + " CPU",
org.junit.Assert.assertTrue("Count > 0 : " + count + " (" + list + ") , Running on " +
Schedulers.DEFAULT_POOL_SIZE + " CPU",
latch.getCount() == 0);

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ public void scanName() {
.is(SchedulersTest.CACHED_SCHEDULER);
assertThat(Scannable.from(cached).scan(Scannable.Attr.NAME))
.as("default parallel()")
.isEqualTo("parallel(" + Runtime.getRuntime().availableProcessors() + ",\"parallel\")");
.isEqualTo("parallel(" + Schedulers.DEFAULT_POOL_SIZE + ",\"parallel\")");

assertThat(Scannable.from(workerWithNamedFactory).scan(Scannable.Attr.NAME))
.as("workerWithNamedFactory")
Expand Down

0 comments on commit bd21f68

Please sign in to comment.