Skip to content

Commit

Permalink
Porting the Scheduler.when operator from 1.x to 2.x (#4827)
Browse files Browse the repository at this point in the history
  • Loading branch information
abersnaze authored and akarnokd committed Nov 10, 2016
1 parent 1d9283d commit c8303e1
Show file tree
Hide file tree
Showing 3 changed files with 610 additions and 0 deletions.
82 changes: 82 additions & 0 deletions src/main/java/io/reactivex/Scheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,12 @@

import java.util.concurrent.TimeUnit;

import io.reactivex.annotations.Experimental;
import io.reactivex.disposables.Disposable;
import io.reactivex.exceptions.Exceptions;
import io.reactivex.functions.Function;
import io.reactivex.internal.disposables.*;
import io.reactivex.internal.schedulers.SchedulerWhen;
import io.reactivex.internal.util.ExceptionHelper;
import io.reactivex.plugins.RxJavaPlugins;

Expand Down Expand Up @@ -171,6 +174,85 @@ public Disposable schedulePeriodicallyDirect(Runnable run, long initialDelay, lo
return periodicTask;
}

/**
* Allows the use of operators for controlling the timing around when
* actions scheduled on workers are actually done. This makes it possible to
* layer additional behavior on this {@link Scheduler}. The only parameter
* is a function that flattens an {@link Flowable} of {@link Flowable}
* of {@link Completable}s into just one {@link Completable}. There must be
* a chain of operators connecting the returned value to the source
* {@link Flowable} otherwise any work scheduled on the returned
* {@link Scheduler} will not be executed.
* <p>
* When {@link Scheduler#createWorker()} is invoked a {@link Flowable} of
* {@link Completable}s is onNext'd to the combinator to be flattened. If
* the inner {@link Flowable} is not immediately subscribed to an calls to
* {@link Worker#schedule} are buffered. Once the {@link Flowable} is
* subscribed to actions are then onNext'd as {@link Completable}s.
* <p>
* Finally the actions scheduled on the parent {@link Scheduler} when the
* inner most {@link Completable}s are subscribed to.
* <p>
* When the {@link Worker} is unsubscribed the {@link Completable} emits an
* onComplete and triggers any behavior in the flattening operator. The
* {@link Flowable} and all {@link Completable}s give to the flattening
* function never onError.
* <p>
* Limit the amount concurrency two at a time without creating a new fix
* size thread pool:
*
* <pre>
* Scheduler limitSched = Schedulers.computation().when(workers -> {
* // use merge max concurrent to limit the number of concurrent
* // callbacks two at a time
* return Completable.merge(Flowable.merge(workers), 2);
* });
* </pre>
* <p>
* This is a slightly different way to limit the concurrency but it has some
* interesting benefits and drawbacks to the method above. It works by
* limited the number of concurrent {@link Worker}s rather than individual
* actions. Generally each {@link Flowable} uses its own {@link Worker}.
* This means that this will essentially limit the number of concurrent
* subscribes. The danger comes from using operators like
* {@link Flowable#zip(Flowable, Flowable, rx.functions.Func2)} where
* subscribing to the first {@link Flowable} could deadlock the
* subscription to the second.
*
* <pre>
* Scheduler limitSched = Schedulers.computation().when(workers -> {
* // use merge max concurrent to limit the number of concurrent
* // Flowables two at a time
* return Completable.merge(Flowable.merge(workers, 2));
* });
* </pre>
*
* Slowing down the rate to no more than than 1 a second. This suffers from
* the same problem as the one above I could find an {@link Flowable}
* operator that limits the rate without dropping the values (aka leaky
* bucket algorithm).
*
* <pre>
* Scheduler slowSched = Schedulers.computation().when(workers -> {
* // use concatenate to make each worker happen one at a time.
* return Completable.concat(workers.map(actions -> {
* // delay the starting of the next worker by 1 second.
* return Completable.merge(actions.delaySubscription(1, TimeUnit.SECONDS));
* }));
* });
* </pre>
*
* @param <S> a Scheduler and a Subscription
* @param combine the function that takes a two-level nested Flowable sequence of a Completable and returns
* the Completable that will be subscribed to and should trigger the execution of the scheduled Actions.
* @return the Scheduler with the customized execution behavior
*/
@SuppressWarnings("unchecked")
@Experimental
public <S extends Scheduler & Disposable> S when(Function<Flowable<Flowable<Completable>>, Completable> combine) {
return (S) new SchedulerWhen(combine, this);
}

/**
* Sequential Scheduler for executing actions on a single thread or event loop.
* <p>
Expand Down
Loading

0 comments on commit c8303e1

Please sign in to comment.