From 71330c06186d72548a3443e8b22ef52d995d0e64 Mon Sep 17 00:00:00 2001 From: David Karnok Date: Wed, 4 Jan 2017 16:21:40 +0100 Subject: [PATCH] 2.x: add sample() overload that can emit the very last buffered item (#4955) --- src/main/java/io/reactivex/Flowable.java | 116 ++++++++++++++- src/main/java/io/reactivex/Observable.java | 104 ++++++++++++- .../flowable/FlowableSamplePublisher.java | 101 +++++++++++-- .../flowable/FlowableSampleTimed.java | 69 ++++++++- .../observable/ObservableSampleTimed.java | 74 +++++++++- .../ObservableSampleWithObservable.java | 102 +++++++++++-- .../flowable/FlowableSampleTest.java | 136 ++++++++++++++++- .../observable/ObservableSampleTest.java | 137 +++++++++++++++++- 8 files changed, 801 insertions(+), 38 deletions(-) diff --git a/src/main/java/io/reactivex/Flowable.java b/src/main/java/io/reactivex/Flowable.java index bf954c9eab..c05fab1448 100644 --- a/src/main/java/io/reactivex/Flowable.java +++ b/src/main/java/io/reactivex/Flowable.java @@ -11530,6 +11530,41 @@ public final Flowable sample(long period, TimeUnit unit) { return sample(period, unit, Schedulers.computation()); } + /** + * Returns a Flowable that emits the most recently emitted item (if any) emitted by the source Publisher + * within periodic time intervals and optionally emit the very last upstream item when the upstream completes. + *

+ * + *

+ *
Backpressure:
+ *
This operator does not support backpressure as it uses time to control data flow.
+ *
Scheduler:
+ *
{@code sample} operates by default on the {@code computation} {@link Scheduler}.
+ *
+ * + * @param period + * the sampling rate + * @param unit + * the {@link TimeUnit} in which {@code period} is defined + * @param emitLast + * if true and the upstream completes while there is still an unsampled item available, + * that item is emitted to downstream before completion + * if false, an unsampled last item is ignored. + * @return a Flowable that emits the results of sampling the items emitted by the source Publisher at + * the specified time interval + * @see ReactiveX operators documentation: Sample + * @see RxJava wiki: Backpressure + * @see #throttleLast(long, TimeUnit) + * @since 2.0.5 - experimental + */ + @CheckReturnValue + @BackpressureSupport(BackpressureKind.ERROR) + @SchedulerSupport(SchedulerSupport.COMPUTATION) + @Experimental + public final Flowable sample(long period, TimeUnit unit, boolean emitLast) { + return sample(period, unit, Schedulers.computation(), emitLast); + } + /** * Returns a Flowable that emits the most recently emitted item (if any) emitted by the source Publisher * within periodic time intervals, where the intervals are defined on a particular Scheduler. @@ -11560,7 +11595,47 @@ public final Flowable sample(long period, TimeUnit unit) { public final Flowable sample(long period, TimeUnit unit, Scheduler scheduler) { ObjectHelper.requireNonNull(unit, "unit is null"); ObjectHelper.requireNonNull(scheduler, "scheduler is null"); - return RxJavaPlugins.onAssembly(new FlowableSampleTimed(this, period, unit, scheduler)); + return RxJavaPlugins.onAssembly(new FlowableSampleTimed(this, period, unit, scheduler, false)); + } + + /** + * Returns a Flowable that emits the most recently emitted item (if any) emitted by the source Publisher + * within periodic time intervals, where the intervals are defined on a particular Scheduler + * and optionally emit the very last upstream item when the upstream completes. + *

+ * + *

+ *
Backpressure:
+ *
This operator does not support backpressure as it uses time to control data flow.
+ *
Scheduler:
+ *
You specify which {@link Scheduler} this operator will use
+ *
+ * + * @param period + * the sampling rate + * @param unit + * the {@link TimeUnit} in which {@code period} is defined + * @param scheduler + * the {@link Scheduler} to use when sampling + * @param emitLast + * if true and the upstream completes while there is still an unsampled item available, + * that item is emitted to downstream before completion + * if false, an unsampled last item is ignored. + * @return a Flowable that emits the results of sampling the items emitted by the source Publisher at + * the specified time interval + * @see ReactiveX operators documentation: Sample + * @see RxJava wiki: Backpressure + * @see #throttleLast(long, TimeUnit, Scheduler) + * @since 2.0.5 - experimental + */ + @CheckReturnValue + @BackpressureSupport(BackpressureKind.ERROR) + @SchedulerSupport(SchedulerSupport.CUSTOM) + @Experimental + public final Flowable sample(long period, TimeUnit unit, Scheduler scheduler, boolean emitLast) { + ObjectHelper.requireNonNull(unit, "unit is null"); + ObjectHelper.requireNonNull(scheduler, "scheduler is null"); + return RxJavaPlugins.onAssembly(new FlowableSampleTimed(this, period, unit, scheduler, emitLast)); } /** @@ -11590,7 +11665,44 @@ public final Flowable sample(long period, TimeUnit unit, Scheduler scheduler) @SchedulerSupport(SchedulerSupport.NONE) public final Flowable sample(Publisher sampler) { ObjectHelper.requireNonNull(sampler, "sampler is null"); - return RxJavaPlugins.onAssembly(new FlowableSamplePublisher(this, sampler)); + return RxJavaPlugins.onAssembly(new FlowableSamplePublisher(this, sampler, false)); + } + + /** + * Returns a Flowable that, when the specified {@code sampler} Publisher emits an item or completes, + * emits the most recently emitted item (if any) emitted by the source Publisher since the previous + * emission from the {@code sampler} Publisher + * and optionally emit the very last upstream item when the upstream or other Publisher complete. + *

+ * + *

+ *
Backpressure:
+ *
This operator does not support backpressure as it uses the emissions of the {@code sampler} + * Publisher to control data flow.
+ *
Scheduler:
+ *
This version of {@code sample} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param the element type of the sampler Publisher + * @param sampler + * the Publisher to use for sampling the source Publisher + * @param emitLast + * if true and the upstream completes while there is still an unsampled item available, + * that item is emitted to downstream before completion + * if false, an unsampled last item is ignored. + * @return a Flowable that emits the results of sampling the items emitted by this Publisher whenever + * the {@code sampler} Publisher emits an item or completes + * @see ReactiveX operators documentation: Sample + * @see RxJava wiki: Backpressure + * @since 2.0.5 - experimental + */ + @CheckReturnValue + @BackpressureSupport(BackpressureKind.ERROR) + @SchedulerSupport(SchedulerSupport.NONE) + @Experimental + public final Flowable sample(Publisher sampler, boolean emitLast) { + ObjectHelper.requireNonNull(sampler, "sampler is null"); + return RxJavaPlugins.onAssembly(new FlowableSamplePublisher(this, sampler, emitLast)); } /** diff --git a/src/main/java/io/reactivex/Observable.java b/src/main/java/io/reactivex/Observable.java index 96de4fd6c4..57eed27352 100644 --- a/src/main/java/io/reactivex/Observable.java +++ b/src/main/java/io/reactivex/Observable.java @@ -9610,6 +9610,37 @@ public final Observable sample(long period, TimeUnit unit) { return sample(period, unit, Schedulers.computation()); } + /** + * Returns an Observable that emits the most recently emitted item (if any) emitted by the source ObservableSource + * within periodic time intervals and optionally emit the very last upstream item when the upstream completes. + *

+ * + *

+ *
Scheduler:
+ *
{@code sample} operates by default on the {@code computation} {@link Scheduler}.
+ *
+ * + * @param period + * the sampling rate + * @param unit + * the {@link TimeUnit} in which {@code period} is defined + * @return an Observable that emits the results of sampling the items emitted by the source ObservableSource at + * the specified time interval + * @param emitLast + * if true and the upstream completes while there is still an unsampled item available, + * that item is emitted to downstream before completion + * if false, an unsampled last item is ignored. + * @see ReactiveX operators documentation: Sample + * @see #throttleLast(long, TimeUnit) + * @since 2.0.5 - experimental + */ + @CheckReturnValue + @SchedulerSupport(SchedulerSupport.COMPUTATION) + @Experimental + public final Observable sample(long period, TimeUnit unit, boolean emitLast) { + return sample(period, unit, Schedulers.computation(), emitLast); + } + /** * Returns an Observable that emits the most recently emitted item (if any) emitted by the source ObservableSource * within periodic time intervals, where the intervals are defined on a particular Scheduler. @@ -9636,7 +9667,43 @@ public final Observable sample(long period, TimeUnit unit) { public final Observable sample(long period, TimeUnit unit, Scheduler scheduler) { ObjectHelper.requireNonNull(unit, "unit is null"); ObjectHelper.requireNonNull(scheduler, "scheduler is null"); - return RxJavaPlugins.onAssembly(new ObservableSampleTimed(this, period, unit, scheduler)); + return RxJavaPlugins.onAssembly(new ObservableSampleTimed(this, period, unit, scheduler, false)); + } + + /** + * Returns an Observable that emits the most recently emitted item (if any) emitted by the source ObservableSource + * within periodic time intervals, where the intervals are defined on a particular Scheduler + * and optionally emit the very last upstream item when the upstream completes. + *

+ * + *

+ *
Scheduler:
+ *
You specify which {@link Scheduler} this operator will use
+ *
+ * + * @param period + * the sampling rate + * @param unit + * the {@link TimeUnit} in which {@code period} is defined + * @param scheduler + * the {@link Scheduler} to use when sampling + * @param emitLast + * if true and the upstream completes while there is still an unsampled item available, + * that item is emitted to downstream before completion + * if false, an unsampled last item is ignored. + * @return an Observable that emits the results of sampling the items emitted by the source ObservableSource at + * the specified time interval + * @see ReactiveX operators documentation: Sample + * @see #throttleLast(long, TimeUnit, Scheduler) + * @since 2.0.5 - experimental + */ + @CheckReturnValue + @SchedulerSupport(SchedulerSupport.CUSTOM) + @Experimental + public final Observable sample(long period, TimeUnit unit, Scheduler scheduler, boolean emitLast) { + ObjectHelper.requireNonNull(unit, "unit is null"); + ObjectHelper.requireNonNull(scheduler, "scheduler is null"); + return RxJavaPlugins.onAssembly(new ObservableSampleTimed(this, period, unit, scheduler, emitLast)); } /** @@ -9662,7 +9729,40 @@ public final Observable sample(long period, TimeUnit unit, Scheduler schedule @SchedulerSupport(SchedulerSupport.NONE) public final Observable sample(ObservableSource sampler) { ObjectHelper.requireNonNull(sampler, "sampler is null"); - return RxJavaPlugins.onAssembly(new ObservableSampleWithObservable(this, sampler)); + return RxJavaPlugins.onAssembly(new ObservableSampleWithObservable(this, sampler, false)); + } + + /** + * Returns an Observable that, when the specified {@code sampler} ObservableSource emits an item or completes, + * emits the most recently emitted item (if any) emitted by the source ObservableSource since the previous + * emission from the {@code sampler} ObservableSource + * and optionally emit the very last upstream item when the upstream or other ObservableSource complete. + *

+ * + *

+ * ObservableSource to control data flow. + *
Scheduler:
+ *
This version of {@code sample} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param the element type of the sampler ObservableSource + * @param sampler + * the ObservableSource to use for sampling the source ObservableSource + * @param emitLast + * if true and the upstream completes while there is still an unsampled item available, + * that item is emitted to downstream before completion + * if false, an unsampled last item is ignored. + * @return an Observable that emits the results of sampling the items emitted by this ObservableSource whenever + * the {@code sampler} ObservableSource emits an item or completes + * @see ReactiveX operators documentation: Sample + * @since 2.0.5 - experimental + */ + @CheckReturnValue + @SchedulerSupport(SchedulerSupport.NONE) + @Experimental + public final Observable sample(ObservableSource sampler, boolean emitLast) { + ObjectHelper.requireNonNull(sampler, "sampler is null"); + return RxJavaPlugins.onAssembly(new ObservableSampleWithObservable(this, sampler, emitLast)); } /** diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableSamplePublisher.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableSamplePublisher.java index 76de888f5a..356d7eac3a 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableSamplePublisher.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableSamplePublisher.java @@ -27,18 +27,25 @@ public final class FlowableSamplePublisher extends Flowable { final Publisher source; final Publisher other; - public FlowableSamplePublisher(Publisher source, Publisher other) { + final boolean emitLast; + + public FlowableSamplePublisher(Publisher source, Publisher other, boolean emitLast) { this.source = source; this.other = other; + this.emitLast = emitLast; } @Override protected void subscribeActual(Subscriber s) { SerializedSubscriber serial = new SerializedSubscriber(s); - source.subscribe(new SamplePublisherSubscriber(serial, other)); + if (emitLast) { + source.subscribe(new SampleMainEmitLast(serial, other)); + } else { + source.subscribe(new SampleMainNoLast(serial, other)); + } } - static final class SamplePublisherSubscriber extends AtomicReference implements Subscriber, Subscription { + abstract static class SamplePublisherSubscriber extends AtomicReference implements Subscriber, Subscription { private static final long serialVersionUID = -3517602651313910099L; @@ -83,7 +90,7 @@ public void onError(Throwable t) { @Override public void onComplete() { SubscriptionHelper.cancel(other); - actual.onComplete(); + completeMain(); } boolean setOther(Subscription o) { @@ -104,16 +111,16 @@ public void cancel() { } public void error(Throwable e) { - cancel(); + s.cancel(); actual.onError(e); } public void complete() { - cancel(); - actual.onComplete(); + s.cancel(); + completeOther(); } - public void emit() { + void emit() { T value = getAndSet(null); if (value != null) { long r = requested.get(); @@ -126,6 +133,12 @@ public void emit() { } } } + + abstract void completeMain(); + + abstract void completeOther(); + + abstract void run(); } static final class SamplerSubscriber implements Subscriber { @@ -144,7 +157,7 @@ public void onSubscribe(Subscription s) { @Override public void onNext(Object t) { - parent.emit(); + parent.run(); } @Override @@ -157,4 +170,74 @@ public void onComplete() { parent.complete(); } } + + static final class SampleMainNoLast extends SamplePublisherSubscriber { + + private static final long serialVersionUID = -3029755663834015785L; + + SampleMainNoLast(Subscriber actual, Publisher other) { + super(actual, other); + } + + @Override + void completeMain() { + actual.onComplete(); + } + + @Override + void completeOther() { + actual.onComplete(); + } + + @Override + void run() { + emit(); + } + } + + static final class SampleMainEmitLast extends SamplePublisherSubscriber { + + private static final long serialVersionUID = -3029755663834015785L; + + final AtomicInteger wip; + + volatile boolean done; + + SampleMainEmitLast(Subscriber actual, Publisher other) { + super(actual, other); + this.wip = new AtomicInteger(); + } + + @Override + void completeMain() { + done = true; + if (wip.getAndIncrement() == 0) { + emit(); + actual.onComplete(); + } + } + + @Override + void completeOther() { + done = true; + if (wip.getAndIncrement() == 0) { + emit(); + actual.onComplete(); + } + } + + @Override + void run() { + if (wip.getAndIncrement() == 0) { + do { + boolean d = done; + emit(); + if (d) { + actual.onComplete(); + return; + } + } while (wip.decrementAndGet() != 0); + } + } + } } diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableSampleTimed.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableSampleTimed.java index 9e4045bb51..9c93da30ab 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableSampleTimed.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableSampleTimed.java @@ -30,20 +30,27 @@ public final class FlowableSampleTimed extends AbstractFlowableWithUpstream source, long period, TimeUnit unit, Scheduler scheduler) { + final boolean emitLast; + + public FlowableSampleTimed(Publisher source, long period, TimeUnit unit, Scheduler scheduler, boolean emitLast) { super(source); this.period = period; this.unit = unit; this.scheduler = scheduler; + this.emitLast = emitLast; } @Override protected void subscribeActual(Subscriber s) { SerializedSubscriber serial = new SerializedSubscriber(s); - source.subscribe(new SampleTimedSubscriber(serial, period, unit, scheduler)); + if (emitLast) { + source.subscribe(new SampleTimedEmitLast(serial, period, unit, scheduler)); + } else { + source.subscribe(new SampleTimedNoLast(serial, period, unit, scheduler)); + } } - static final class SampleTimedSubscriber extends AtomicReference implements Subscriber, Subscription, Runnable { + abstract static class SampleTimedSubscriber extends AtomicReference implements Subscriber, Subscription, Runnable { private static final long serialVersionUID = -3517602651313910099L; @@ -89,7 +96,7 @@ public void onError(Throwable t) { @Override public void onComplete() { cancelTimer(); - actual.onComplete(); + complete(); } void cancelTimer() { @@ -109,8 +116,7 @@ public void cancel() { s.cancel(); } - @Override - public void run() { + void emit() { T value = getAndSet(null); if (value != null) { long r = requested.get(); @@ -123,5 +129,56 @@ public void run() { } } } + + abstract void complete(); + } + + static final class SampleTimedNoLast extends SampleTimedSubscriber { + + private static final long serialVersionUID = -7139995637533111443L; + + SampleTimedNoLast(Subscriber actual, long period, TimeUnit unit, Scheduler scheduler) { + super(actual, period, unit, scheduler); + } + + @Override + void complete() { + actual.onComplete(); + } + + @Override + public void run() { + emit(); + } + } + + static final class SampleTimedEmitLast extends SampleTimedSubscriber { + + private static final long serialVersionUID = -7139995637533111443L; + + final AtomicInteger wip; + + SampleTimedEmitLast(Subscriber actual, long period, TimeUnit unit, Scheduler scheduler) { + super(actual, period, unit, scheduler); + this.wip = new AtomicInteger(1); + } + + @Override + void complete() { + emit(); + if (wip.decrementAndGet() == 0) { + actual.onComplete(); + } + } + + @Override + public void run() { + if (wip.incrementAndGet() == 2) { + emit(); + if (wip.decrementAndGet() == 0) { + actual.onComplete(); + } + } + } } } diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableSampleTimed.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableSampleTimed.java index a5312ece30..b5b06967c9 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableSampleTimed.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableSampleTimed.java @@ -14,7 +14,7 @@ package io.reactivex.internal.operators.observable; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.atomic.*; import io.reactivex.*; import io.reactivex.disposables.Disposable; @@ -26,21 +26,28 @@ public final class ObservableSampleTimed extends AbstractObservableWithUpstre final TimeUnit unit; final Scheduler scheduler; - public ObservableSampleTimed(ObservableSource source, long period, TimeUnit unit, Scheduler scheduler) { + final boolean emitLast; + + public ObservableSampleTimed(ObservableSource source, long period, TimeUnit unit, Scheduler scheduler, boolean emitLast) { super(source); this.period = period; this.unit = unit; this.scheduler = scheduler; + this.emitLast = emitLast; } @Override public void subscribeActual(Observer t) { SerializedObserver serial = new SerializedObserver(t); - source.subscribe(new SampleTimedObserver(serial, period, unit, scheduler)); + if (emitLast) { + source.subscribe(new SampleTimedEmitLast(serial, period, unit, scheduler)); + } else { + source.subscribe(new SampleTimedNoLast(serial, period, unit, scheduler)); + } } - static final class SampleTimedObserver extends AtomicReference implements Observer, Disposable, Runnable { + abstract static class SampleTimedObserver extends AtomicReference implements Observer, Disposable, Runnable { private static final long serialVersionUID = -3517602651313910099L; @@ -85,7 +92,7 @@ public void onError(Throwable t) { @Override public void onComplete() { cancelTimer(); - actual.onComplete(); + complete(); } void cancelTimer() { @@ -98,16 +105,67 @@ public void dispose() { s.dispose(); } - @Override public boolean isDisposed() { + @Override + public boolean isDisposed() { return s.isDisposed(); } - @Override - public void run() { + void emit() { T value = getAndSet(null); if (value != null) { actual.onNext(value); } } + + abstract void complete(); + } + + static final class SampleTimedNoLast extends SampleTimedObserver { + + private static final long serialVersionUID = -7139995637533111443L; + + SampleTimedNoLast(Observer actual, long period, TimeUnit unit, Scheduler scheduler) { + super(actual, period, unit, scheduler); + } + + @Override + void complete() { + actual.onComplete(); + } + + @Override + public void run() { + emit(); + } + } + + static final class SampleTimedEmitLast extends SampleTimedObserver { + + private static final long serialVersionUID = -7139995637533111443L; + + final AtomicInteger wip; + + SampleTimedEmitLast(Observer actual, long period, TimeUnit unit, Scheduler scheduler) { + super(actual, period, unit, scheduler); + this.wip = new AtomicInteger(1); + } + + @Override + void complete() { + emit(); + if (wip.decrementAndGet() == 0) { + actual.onComplete(); + } + } + + @Override + public void run() { + if (wip.incrementAndGet() == 2) { + emit(); + if (wip.decrementAndGet() == 0) { + actual.onComplete(); + } + } + } } } diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableSampleWithObservable.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableSampleWithObservable.java index 099284309e..37d85cf4cc 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableSampleWithObservable.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableSampleWithObservable.java @@ -13,28 +13,36 @@ package io.reactivex.internal.operators.observable; -import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.atomic.*; import io.reactivex.*; import io.reactivex.disposables.Disposable; -import io.reactivex.internal.disposables.*; +import io.reactivex.internal.disposables.DisposableHelper; import io.reactivex.observers.SerializedObserver; public final class ObservableSampleWithObservable extends AbstractObservableWithUpstream { + final ObservableSource other; - public ObservableSampleWithObservable(ObservableSource source, ObservableSource other) { + final boolean emitLast; + + public ObservableSampleWithObservable(ObservableSource source, ObservableSource other, boolean emitLast) { super(source); this.other = other; + this.emitLast = emitLast; } @Override public void subscribeActual(Observer t) { SerializedObserver serial = new SerializedObserver(t); - source.subscribe(new SampleMainObserver(serial, other)); + if (emitLast) { + source.subscribe(new SampleMainEmitLast(serial, other)); + } else { + source.subscribe(new SampleMainNoLast(serial, other)); + } } - static final class SampleMainObserver extends AtomicReference + abstract static class SampleMainObserver extends AtomicReference implements Observer, Disposable { private static final long serialVersionUID = -3517602651313910099L; @@ -76,7 +84,7 @@ public void onError(Throwable t) { @Override public void onComplete() { DisposableHelper.dispose(other); - actual.onComplete(); + completeMain(); } boolean setOther(Disposable o) { @@ -101,15 +109,21 @@ public void error(Throwable e) { public void complete() { s.dispose(); - actual.onComplete(); + completeOther(); } - public void emit() { + void emit() { T value = getAndSet(null); if (value != null) { actual.onNext(value); } } + + abstract void completeMain(); + + abstract void completeOther(); + + abstract void run(); } static final class SamplerObserver implements Observer { @@ -126,7 +140,7 @@ public void onSubscribe(Disposable s) { @Override public void onNext(Object t) { - parent.emit(); + parent.run(); } @Override @@ -139,4 +153,74 @@ public void onComplete() { parent.complete(); } } + + static final class SampleMainNoLast extends SampleMainObserver { + + private static final long serialVersionUID = -3029755663834015785L; + + SampleMainNoLast(Observer actual, ObservableSource other) { + super(actual, other); + } + + @Override + void completeMain() { + actual.onComplete(); + } + + @Override + void completeOther() { + actual.onComplete(); + } + + @Override + void run() { + emit(); + } + } + + static final class SampleMainEmitLast extends SampleMainObserver { + + private static final long serialVersionUID = -3029755663834015785L; + + final AtomicInteger wip; + + volatile boolean done; + + SampleMainEmitLast(Observer actual, ObservableSource other) { + super(actual, other); + this.wip = new AtomicInteger(); + } + + @Override + void completeMain() { + done = true; + if (wip.getAndIncrement() == 0) { + emit(); + actual.onComplete(); + } + } + + @Override + void completeOther() { + done = true; + if (wip.getAndIncrement() == 0) { + emit(); + actual.onComplete(); + } + } + + @Override + void run() { + if (wip.getAndIncrement() == 0) { + do { + boolean d = done; + emit(); + if (d) { + actual.onComplete(); + return; + } + } while (wip.decrementAndGet() != 0); + } + } + } } diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableSampleTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableSampleTest.java index 6b17041a18..08def21334 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableSampleTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableSampleTest.java @@ -26,7 +26,8 @@ import io.reactivex.exceptions.*; import io.reactivex.internal.subscriptions.BooleanSubscription; import io.reactivex.processors.*; -import io.reactivex.schedulers.TestScheduler; +import io.reactivex.schedulers.*; +import io.reactivex.subscribers.TestSubscriber; public class FlowableSampleTest { private TestScheduler scheduler; @@ -310,4 +311,137 @@ public void backpressureOverflowWithOtherPublisher() { .awaitDone(5, TimeUnit.SECONDS) .assertFailure(MissingBackpressureException.class); } + + @Test + public void emitLastTimed() { + Flowable.just(1) + .sample(1, TimeUnit.DAYS, true) + .test() + .assertResult(1); + } + + @Test + public void emitLastTimedEmpty() { + Flowable.empty() + .sample(1, TimeUnit.DAYS, true) + .test() + .assertResult(); + } + + @Test + public void emitLastTimedCustomScheduler() { + Flowable.just(1) + .sample(1, TimeUnit.DAYS, Schedulers.single(), true) + .test() + .assertResult(1); + } + + @Test + public void emitLastTimedRunCompleteRace() { + for (int i = 0; i < 1000; i++) { + final TestScheduler scheduler = new TestScheduler(); + + final PublishProcessor pp = PublishProcessor.create(); + + TestSubscriber ts = pp.sample(1, TimeUnit.SECONDS, scheduler, true) + .test(); + + pp.onNext(1); + + Runnable r1 = new Runnable() { + @Override + public void run() { + pp.onComplete(); + } + }; + + Runnable r2 = new Runnable() { + @Override + public void run() { + scheduler.advanceTimeBy(1, TimeUnit.SECONDS); + } + }; + + TestHelper.race(r1, r2); + + ts.assertResult(1); + } + } + + @Test + public void emitLastOther() { + Flowable.just(1) + .sample(Flowable.timer(1, TimeUnit.DAYS), true) + .test() + .assertResult(1); + } + + @Test + public void emitLastOtherEmpty() { + Flowable.empty() + .sample(Flowable.timer(1, TimeUnit.DAYS), true) + .test() + .assertResult(); + } + + @Test + public void emitLastOtherRunCompleteRace() { + for (int i = 0; i < 1000; i++) { + final PublishProcessor pp = PublishProcessor.create(); + final PublishProcessor sampler = PublishProcessor.create(); + + TestSubscriber ts = pp.sample(sampler, true) + .test(); + + pp.onNext(1); + + Runnable r1 = new Runnable() { + @Override + public void run() { + pp.onComplete(); + } + }; + + Runnable r2 = new Runnable() { + @Override + public void run() { + sampler.onNext(1); + } + }; + + TestHelper.race(r1, r2); + + ts.assertResult(1); + } + } + + @Test + public void emitLastOtherCompleteCompleteRace() { + for (int i = 0; i < 1000; i++) { + final PublishProcessor pp = PublishProcessor.create(); + final PublishProcessor sampler = PublishProcessor.create(); + + TestSubscriber ts = pp.sample(sampler, true).test(); + + pp.onNext(1); + + Runnable r1 = new Runnable() { + @Override + public void run() { + pp.onComplete(); + } + }; + + Runnable r2 = new Runnable() { + @Override + public void run() { + sampler.onComplete(); + } + }; + + TestHelper.race(r1, r2); + + ts.assertResult(1); + } + } } \ No newline at end of file diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableSampleTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableSampleTest.java index 2c791c759e..5e589b2d0a 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableSampleTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableSampleTest.java @@ -24,7 +24,8 @@ import io.reactivex.*; import io.reactivex.disposables.*; import io.reactivex.exceptions.TestException; -import io.reactivex.schedulers.TestScheduler; +import io.reactivex.observers.TestObserver; +import io.reactivex.schedulers.*; import io.reactivex.subjects.PublishSubject; public class ObservableSampleTest { @@ -291,4 +292,138 @@ public void error() { .test() .assertFailure(TestException.class); } + + @Test + public void emitLastTimed() { + Observable.just(1) + .sample(1, TimeUnit.DAYS, true) + .test() + .assertResult(1); + } + + @Test + public void emitLastTimedEmpty() { + Observable.empty() + .sample(1, TimeUnit.DAYS, true) + .test() + .assertResult(); + } + + @Test + public void emitLastTimedCustomScheduler() { + Observable.just(1) + .sample(1, TimeUnit.DAYS, Schedulers.single(), true) + .test() + .assertResult(1); + } + + @Test + public void emitLastTimedRunCompleteRace() { + for (int i = 0; i < 1000; i++) { + final TestScheduler scheduler = new TestScheduler(); + + final PublishSubject pp = PublishSubject.create(); + + TestObserver ts = pp.sample(1, TimeUnit.SECONDS, scheduler, true) + .test(); + + pp.onNext(1); + + Runnable r1 = new Runnable() { + @Override + public void run() { + pp.onComplete(); + } + }; + + Runnable r2 = new Runnable() { + @Override + public void run() { + scheduler.advanceTimeBy(1, TimeUnit.SECONDS); + } + }; + + TestHelper.race(r1, r2); + + ts.assertResult(1); + } + } + + @Test + public void emitLastOther() { + Observable.just(1) + .sample(Observable.timer(1, TimeUnit.DAYS), true) + .test() + .assertResult(1); + } + + @Test + public void emitLastOtherEmpty() { + Observable.empty() + .sample(Observable.timer(1, TimeUnit.DAYS), true) + .test() + .assertResult(); + } + + @Test + public void emitLastOtherRunCompleteRace() { + for (int i = 0; i < 1000; i++) { + final PublishSubject pp = PublishSubject.create(); + final PublishSubject sampler = PublishSubject.create(); + + TestObserver ts = pp.sample(sampler, true) + .test(); + + pp.onNext(1); + + Runnable r1 = new Runnable() { + @Override + public void run() { + pp.onComplete(); + } + }; + + Runnable r2 = new Runnable() { + @Override + public void run() { + sampler.onNext(1); + } + }; + + TestHelper.race(r1, r2); + + ts.assertResult(1); + } + } + + @Test + public void emitLastOtherCompleteCompleteRace() { + for (int i = 0; i < 1000; i++) { + final PublishSubject pp = PublishSubject.create(); + final PublishSubject sampler = PublishSubject.create(); + + TestObserver ts = pp.sample(sampler, true).test(); + + pp.onNext(1); + + Runnable r1 = new Runnable() { + @Override + public void run() { + pp.onComplete(); + } + }; + + Runnable r2 = new Runnable() { + @Override + public void run() { + sampler.onComplete(); + } + }; + + TestHelper.race(r1, r2); + + ts.assertResult(1); + } + } + }