Skip to content

2.x: Add scheduler creation factories #5002

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Jan 25, 2017
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,20 @@
*/
package io.reactivex.internal.schedulers;

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicReference;

import io.reactivex.Scheduler;
import io.reactivex.disposables.*;
import io.reactivex.internal.disposables.*;

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicReference;

/**
* Holds a fixed pool of worker threads and assigns them
* to requested Scheduler.Workers in a round-robin fashion.
*/
public final class ComputationScheduler extends Scheduler {
/** This will indicate no pool is active. */
static final FixedSchedulerPool NONE = new FixedSchedulerPool(0);
static final FixedSchedulerPool NONE;
/** Manages a fixed number of workers. */
private static final String THREAD_NAME_PREFIX = "RxComputationThreadPool";
static final RxThreadFactory THREAD_FACTORY;
Expand All @@ -42,6 +42,7 @@ public final class ComputationScheduler extends Scheduler {

static final PoolWorker SHUTDOWN_WORKER;

final ThreadFactory threadFactory;
final AtomicReference<FixedSchedulerPool> pool;
/** The name of the system property for setting the thread priority for this Scheduler. */
private static final String KEY_COMPUTATION_PRIORITY = "rx2.computation-priority";
Expand All @@ -56,6 +57,9 @@ public final class ComputationScheduler extends Scheduler {
Integer.getInteger(KEY_COMPUTATION_PRIORITY, Thread.NORM_PRIORITY)));

THREAD_FACTORY = new RxThreadFactory(THREAD_NAME_PREFIX, priority);

NONE = new FixedSchedulerPool(0, THREAD_FACTORY);
NONE.shutdown();
}

static int cap(int cpuCount, int paramThreads) {
Expand All @@ -68,12 +72,12 @@ static final class FixedSchedulerPool {
final PoolWorker[] eventLoops;
long n;

FixedSchedulerPool(int maxThreads) {
FixedSchedulerPool(int maxThreads, ThreadFactory threadFactory) {
// initialize event loops
this.cores = maxThreads;
this.eventLoops = new PoolWorker[maxThreads];
for (int i = 0; i < maxThreads; i++) {
this.eventLoops[i] = new PoolWorker(THREAD_FACTORY);
this.eventLoops[i] = new PoolWorker(threadFactory);
}
}

Expand All @@ -98,6 +102,18 @@ public void shutdown() {
* count and using least-recent worker selection policy.
*/
public ComputationScheduler() {
this(THREAD_FACTORY);
}

/**
* Create a scheduler with pool size equal to the available processor
* count and using least-recent worker selection policy.
*
* @param threadFactory thread factory to use for creating worker threads. Note that this takes precedence over any
* system properties for configuring new thread creation. Cannot be null.
*/
public ComputationScheduler(ThreadFactory threadFactory) {
this.threadFactory = threadFactory;
this.pool = new AtomicReference<FixedSchedulerPool>(NONE);
start();
}
Expand All @@ -121,7 +137,7 @@ public Disposable schedulePeriodicallyDirect(Runnable run, long initialDelay, lo

@Override
public void start() {
FixedSchedulerPool update = new FixedSchedulerPool(MAX_THREADS);
FixedSchedulerPool update = new FixedSchedulerPool(MAX_THREADS, threadFactory);
if (!pool.compareAndSet(NONE, update)) {
update.shutdown();
}
Expand Down
30 changes: 21 additions & 9 deletions src/main/java/io/reactivex/internal/schedulers/IoScheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@

package io.reactivex.internal.schedulers;

import java.util.concurrent.*;
import java.util.concurrent.atomic.*;

import io.reactivex.Scheduler;
import io.reactivex.disposables.*;
import io.reactivex.internal.disposables.EmptyDisposable;

import java.util.concurrent.*;
import java.util.concurrent.atomic.*;

/**
* Scheduler that creates and caches a set of thread pools and reuses them if possible.
*/
Expand All @@ -37,16 +37,14 @@ public final class IoScheduler extends Scheduler {
private static final TimeUnit KEEP_ALIVE_UNIT = TimeUnit.SECONDS;

static final ThreadWorker SHUTDOWN_THREAD_WORKER;
final ThreadFactory threadFactory;
final AtomicReference<CachedWorkerPool> pool;

/** The name of the system property for setting the thread priority for this Scheduler. */
private static final String KEY_IO_PRIORITY = "rx2.io-priority";

static final CachedWorkerPool NONE;
static {
NONE = new CachedWorkerPool(0, null);
NONE.shutdown();

SHUTDOWN_THREAD_WORKER = new ThreadWorker(new RxThreadFactory("RxCachedThreadSchedulerShutdown"));
SHUTDOWN_THREAD_WORKER.dispose();

Expand All @@ -56,6 +54,9 @@ public final class IoScheduler extends Scheduler {
WORKER_THREAD_FACTORY = new RxThreadFactory(WORKER_THREAD_NAME_PREFIX, priority);

EVICTOR_THREAD_FACTORY = new RxThreadFactory(EVICTOR_THREAD_NAME_PREFIX, priority);

NONE = new CachedWorkerPool(0, null, WORKER_THREAD_FACTORY);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any particular reason why this was moved to the end?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It requires WORKER_THREAD_FACTORY to be initialized now since it's a parameter

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right.

NONE.shutdown();
}

static final class CachedWorkerPool implements Runnable {
Expand All @@ -64,11 +65,13 @@ static final class CachedWorkerPool implements Runnable {
final CompositeDisposable allWorkers;
private final ScheduledExecutorService evictorService;
private final Future<?> evictorTask;
private final ThreadFactory threadFactory;

CachedWorkerPool(long keepAliveTime, TimeUnit unit) {
CachedWorkerPool(long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) {
this.keepAliveTime = unit != null ? unit.toNanos(keepAliveTime) : 0L;
this.expiringWorkerQueue = new ConcurrentLinkedQueue<ThreadWorker>();
this.allWorkers = new CompositeDisposable();
this.threadFactory = threadFactory;

ScheduledExecutorService evictor = null;
Future<?> task = null;
Expand Down Expand Up @@ -97,7 +100,7 @@ ThreadWorker get() {
}

// No cached worker found, so create a new one.
ThreadWorker w = new ThreadWorker(WORKER_THREAD_FACTORY);
ThreadWorker w = new ThreadWorker(threadFactory);
allWorkers.add(w);
return w;
}
Expand Down Expand Up @@ -143,13 +146,22 @@ void shutdown() {
}

public IoScheduler() {
this(WORKER_THREAD_FACTORY);
}

/**
* @param threadFactory thread factory to use for creating worker threads. Note that this takes precedence over any
* system properties for configuring new thread creation. Cannot be null.
*/
public IoScheduler(ThreadFactory threadFactory) {
this.threadFactory = threadFactory;
this.pool = new AtomicReference<CachedWorkerPool>(NONE);
start();
}

@Override
public void start() {
CachedWorkerPool update = new CachedWorkerPool(KEEP_ALIVE_TIME, KEEP_ALIVE_UNIT);
CachedWorkerPool update = new CachedWorkerPool(KEEP_ALIVE_TIME, KEEP_ALIVE_UNIT, threadFactory);
if (!pool.compareAndSet(NONE, update)) {
update.shutdown();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,18 @@

import io.reactivex.Scheduler;

import java.util.concurrent.ThreadFactory;

/**
* Schedules work on a new thread.
*/
public final class NewThreadScheduler extends Scheduler {

final ThreadFactory threadFactory;

private static final String THREAD_NAME_PREFIX = "RxNewThreadScheduler";
private static final RxThreadFactory THREAD_FACTORY;

private static final NewThreadScheduler INSTANCE = new NewThreadScheduler();

/** The name of the system property for setting the thread priority for this Scheduler. */
private static final String KEY_NEWTHREAD_PRIORITY = "rx2.newthread-priority";

Expand All @@ -38,16 +40,16 @@ public final class NewThreadScheduler extends Scheduler {
THREAD_FACTORY = new RxThreadFactory(THREAD_NAME_PREFIX, priority);
}

public static NewThreadScheduler instance() {
return INSTANCE;
public NewThreadScheduler() {
this(THREAD_FACTORY);
}

private NewThreadScheduler() {

public NewThreadScheduler(ThreadFactory threadFactory) {
this.threadFactory = threadFactory;
}

@Override
public Worker createWorker() {
return new NewThreadWorker(THREAD_FACTORY);
return new NewThreadWorker(threadFactory);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,21 @@
*/
package io.reactivex.internal.schedulers;

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicReference;

import io.reactivex.Scheduler;
import io.reactivex.disposables.*;
import io.reactivex.internal.disposables.EmptyDisposable;
import io.reactivex.plugins.RxJavaPlugins;

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicReference;

/**
* A scheduler with a shared, single threaded underlying ScheduledExecutorService.
* @since 2.0
*/
public final class SingleScheduler extends Scheduler {

final ThreadFactory threadFactory;
final AtomicReference<ScheduledExecutorService> executor = new AtomicReference<ScheduledExecutorService>();

/** The name of the system property for setting the thread priority for this Scheduler. */
Expand All @@ -47,11 +48,20 @@ public final class SingleScheduler extends Scheduler {
}

public SingleScheduler() {
executor.lazySet(createExecutor());
this(SINGLE_THREAD_FACTORY);
}

/**
* @param threadFactory thread factory to use for creating worker threads. Note that this takes precedence over any
* system properties for configuring new thread creation. Cannot be null.
*/
public SingleScheduler(ThreadFactory threadFactory) {
this.threadFactory = threadFactory;
executor.lazySet(createExecutor(threadFactory));
}

static ScheduledExecutorService createExecutor() {
return SchedulerPoolFactory.create(SINGLE_THREAD_FACTORY);
static ScheduledExecutorService createExecutor(ThreadFactory threadFactory) {
return SchedulerPoolFactory.create(threadFactory);
}

@Override
Expand All @@ -66,7 +76,7 @@ public void start() {
return;
}
if (next == null) {
next = createExecutor();
next = createExecutor(threadFactory);
}
if (executor.compareAndSet(current, next)) {
return;
Expand Down
66 changes: 60 additions & 6 deletions src/main/java/io/reactivex/plugins/RxJavaPlugins.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,19 @@
*/
package io.reactivex.plugins;

import java.lang.Thread.UncaughtExceptionHandler;
import java.util.concurrent.Callable;

import io.reactivex.internal.functions.ObjectHelper;
import org.reactivestreams.Subscriber;

import io.reactivex.*;
import io.reactivex.annotations.Experimental;
import io.reactivex.flowables.ConnectableFlowable;
import io.reactivex.functions.*;
import io.reactivex.internal.functions.ObjectHelper;
import io.reactivex.internal.schedulers.*;
import io.reactivex.internal.util.ExceptionHelper;
import io.reactivex.observables.ConnectableObservable;
import io.reactivex.schedulers.Schedulers;
import org.reactivestreams.Subscriber;

import java.lang.Thread.UncaughtExceptionHandler;
import java.util.concurrent.*;

/**
* Utility class to inject handlers to certain standard RxJava operations.
Expand Down Expand Up @@ -926,6 +928,58 @@ public static Completable onAssembly(Completable source) {
return source;
}

/**
* Create an instance of the default {@link Scheduler} used for {@link Schedulers#computation()}
* except using {@code threadFactory} for thread creation.
* @param threadFactory thread factory to use for creating worker threads. Note that this takes precedence over any
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add notes to all of these methods something like this:

Note that the returned Scheduler must be shut down manually if the ThreadFactory doesn't create a daemon thread, otherwise the JVM may not quit.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The "Note that this takes precedence..." may be misleading. The number of threads is still based on the system configuration. I'd remove this sentence entirely

* system properties for configuring new thread creation. Cannot be null.
* @return the created Scheduler instance
* @since 2.0.5 - experimental
*/
@Experimental
public static Scheduler createComputationScheduler(ThreadFactory threadFactory) {
return new ComputationScheduler(ObjectHelper.requireNonNull(threadFactory, "threadFactory is null"));
}

/**
* Create an instance of the default {@link Scheduler} used for {@link Schedulers#io()}
* except using {@code threadFactory} for thread creation.
* @param threadFactory thread factory to use for creating worker threads. Note that this takes precedence over any
* system properties for configuring new thread creation. Cannot be null.
* @return the created Scheduler instance
* @since 2.0.5 - experimental
*/
@Experimental
public static Scheduler createIoScheduler(ThreadFactory threadFactory) {
return new IoScheduler(ObjectHelper.requireNonNull(threadFactory, "threadFactory is null"));
}

/**
* Create an instance of the default {@link Scheduler} used for {@link Schedulers#newThread()}
* except using {@code threadFactory} for thread creation.
* @param threadFactory thread factory to use for creating worker threads. Note that this takes precedence over any
* system properties for configuring new thread creation. Cannot be null.
* @return the created Scheduler instance
* @since 2.0.5 - experimental
*/
@Experimental
public static Scheduler createNewThreadScheduler(ThreadFactory threadFactory) {
return new NewThreadScheduler(ObjectHelper.requireNonNull(threadFactory, "threadFactory is null"));
}

/**
* Create an instance of the default {@link Scheduler} used for {@link Schedulers#single()}
* except using {@code threadFactory} for thread creation.
* @param threadFactory thread factory to use for creating worker threads. Note that this takes precedence over any
* system properties for configuring new thread creation. Cannot be null.
* @return the created Scheduler instance
* @since 2.0.5 - experimental
*/
@Experimental
public static Scheduler createSingleScheduler(ThreadFactory threadFactory) {
return new SingleScheduler(ObjectHelper.requireNonNull(threadFactory, "threadFactory is null"));
}

/**
* Wraps the call to the function in try-catch and propagates thrown
* checked exceptions as RuntimeException.
Expand Down
7 changes: 3 additions & 4 deletions src/main/java/io/reactivex/schedulers/Schedulers.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,12 @@

package io.reactivex.schedulers;

import java.util.concurrent.Callable;
import java.util.concurrent.Executor;

import io.reactivex.Scheduler;
import io.reactivex.internal.schedulers.*;
import io.reactivex.plugins.RxJavaPlugins;

import java.util.concurrent.*;

/**
* Static factory methods for returning standard Scheduler instances.
* <p>
Expand Down Expand Up @@ -58,7 +57,7 @@ static final class IoHolder {
}

static final class NewThreadHolder {
static final Scheduler DEFAULT = NewThreadScheduler.instance();
static final Scheduler DEFAULT = new NewThreadScheduler();
}

static {
Expand Down
Loading