Skip to content

2.x: Add @CheckReturnValue to create methods of Subjects + Processors #4971

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jan 8, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/main/java/io/reactivex/processors/AsyncProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
*/
package io.reactivex.processors;

import io.reactivex.annotations.CheckReturnValue;
import java.util.Arrays;
import java.util.concurrent.atomic.AtomicReference;

Expand Down Expand Up @@ -49,6 +50,7 @@ public final class AsyncProcessor<T> extends FlowableProcessor<T> {
* @param <T> the value type to be received and emitted
* @return the new AsyncProcessor instance
*/
@CheckReturnValue
public static <T> AsyncProcessor<T> create() {
return new AsyncProcessor<T>();
}
Expand Down
3 changes: 3 additions & 0 deletions src/main/java/io/reactivex/processors/BehaviorProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

package io.reactivex.processors;

import io.reactivex.annotations.CheckReturnValue;
import java.lang.reflect.Array;
import java.util.concurrent.atomic.*;
import java.util.concurrent.locks.*;
Expand Down Expand Up @@ -97,6 +98,7 @@ public final class BehaviorProcessor<T> extends FlowableProcessor<T> {
* the type of item the Subject will emit
* @return the constructed {@link BehaviorProcessor}
*/
@CheckReturnValue
public static <T> BehaviorProcessor<T> create() {
return new BehaviorProcessor<T>();
}
Expand All @@ -112,6 +114,7 @@ public static <T> BehaviorProcessor<T> create() {
* {@link BehaviorProcessor} has not yet observed any items from its source {@code Observable}
* @return the constructed {@link BehaviorProcessor}
*/
@CheckReturnValue
public static <T> BehaviorProcessor<T> createDefault(T defaultValue) {
ObjectHelper.requireNonNull(defaultValue, "defaultValue is null");
return new BehaviorProcessor<T>(defaultValue);
Expand Down
2 changes: 2 additions & 0 deletions src/main/java/io/reactivex/processors/PublishProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
*/
package io.reactivex.processors;

import io.reactivex.annotations.CheckReturnValue;
import java.util.concurrent.atomic.*;

import org.reactivestreams.*;
Expand Down Expand Up @@ -74,6 +75,7 @@ public final class PublishProcessor<T> extends FlowableProcessor<T> {
* @param <T> the value type
* @return the new PublishProcessor
*/
@CheckReturnValue
public static <T> PublishProcessor<T> create() {
return new PublishProcessor<T>();
}
Expand Down
6 changes: 6 additions & 0 deletions src/main/java/io/reactivex/processors/ReplayProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

package io.reactivex.processors;

import io.reactivex.annotations.CheckReturnValue;
import java.lang.reflect.Array;
import java.util.*;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -89,6 +90,7 @@ public final class ReplayProcessor<T> extends FlowableProcessor<T> {
* the type of items observed and emitted by the ReplayProcessor
* @return the created ReplayProcessor
*/
@CheckReturnValue
public static <T> ReplayProcessor<T> create() {
return new ReplayProcessor<T>(new UnboundedReplayBuffer<T>(16));
}
Expand All @@ -108,6 +110,7 @@ public static <T> ReplayProcessor<T> create() {
* the initial buffer capacity
* @return the created subject
*/
@CheckReturnValue
public static <T> ReplayProcessor<T> create(int capacityHint) {
return new ReplayProcessor<T>(new UnboundedReplayBuffer<T>(capacityHint));
}
Expand All @@ -132,6 +135,7 @@ public static <T> ReplayProcessor<T> create(int capacityHint) {
* the maximum number of buffered items
* @return the created subject
*/
@CheckReturnValue
public static <T> ReplayProcessor<T> createWithSize(int maxSize) {
return new ReplayProcessor<T>(new SizeBoundReplayBuffer<T>(maxSize));
}
Expand Down Expand Up @@ -185,6 +189,7 @@ public static <T> ReplayProcessor<T> createWithSize(int maxSize) {
* the {@link Scheduler} that provides the current time
* @return the created subject
*/
@CheckReturnValue
public static <T> ReplayProcessor<T> createWithTime(long maxAge, TimeUnit unit, Scheduler scheduler) {
return new ReplayProcessor<T>(new SizeAndTimeBoundReplayBuffer<T>(Integer.MAX_VALUE, maxAge, unit, scheduler));
}
Expand Down Expand Up @@ -223,6 +228,7 @@ public static <T> ReplayProcessor<T> createWithTime(long maxAge, TimeUnit unit,
* the {@link Scheduler} that provides the current time
* @return the created subject
*/
@CheckReturnValue
public static <T> ReplayProcessor<T> createWithTimeAndSize(long maxAge, TimeUnit unit, Scheduler scheduler, int maxSize) {
return new ReplayProcessor<T>(new SizeAndTimeBoundReplayBuffer<T>(maxSize, maxAge, unit, scheduler));
}
Expand Down
4 changes: 4 additions & 0 deletions src/main/java/io/reactivex/processors/UnicastProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

package io.reactivex.processors;

import io.reactivex.annotations.CheckReturnValue;
import java.util.concurrent.atomic.*;

import org.reactivestreams.*;
Expand Down Expand Up @@ -65,6 +66,7 @@ public final class UnicastProcessor<T> extends FlowableProcessor<T> {
* @param <T> the value type
* @return an UnicastSubject instance
*/
@CheckReturnValue
public static <T> UnicastProcessor<T> create() {
return new UnicastProcessor<T>(bufferSize());
}
Expand All @@ -75,6 +77,7 @@ public static <T> UnicastProcessor<T> create() {
* @param capacityHint the hint to size the internal unbounded buffer
* @return an UnicastProcessor instance
*/
@CheckReturnValue
public static <T> UnicastProcessor<T> create(int capacityHint) {
return new UnicastProcessor<T>(capacityHint);
}
Expand All @@ -91,6 +94,7 @@ public static <T> UnicastProcessor<T> create(int capacityHint) {
* @param onCancelled the non null callback
* @return an UnicastProcessor instance
*/
@CheckReturnValue
public static <T> UnicastProcessor<T> create(int capacityHint, Runnable onCancelled) {
return new UnicastProcessor<T>(capacityHint, onCancelled);
}
Expand Down
2 changes: 2 additions & 0 deletions src/main/java/io/reactivex/subjects/AsyncSubject.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

package io.reactivex.subjects;

import io.reactivex.annotations.CheckReturnValue;
import java.util.Arrays;
import java.util.concurrent.atomic.AtomicReference;

Expand Down Expand Up @@ -51,6 +52,7 @@ public final class AsyncSubject<T> extends Subject<T> {
* @param <T> the value type to be received and emitted
* @return the new AsyncProcessor instance
*/
@CheckReturnValue
public static <T> AsyncSubject<T> create() {
return new AsyncSubject<T>();
}
Expand Down
3 changes: 3 additions & 0 deletions src/main/java/io/reactivex/subjects/BehaviorSubject.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

package io.reactivex.subjects;

import io.reactivex.annotations.CheckReturnValue;
import java.lang.reflect.Array;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.*;
Expand Down Expand Up @@ -96,6 +97,7 @@ public final class BehaviorSubject<T> extends Subject<T> {
* the type of item the Subject will emit
* @return the constructed {@link BehaviorSubject}
*/
@CheckReturnValue
public static <T> BehaviorSubject<T> create() {
return new BehaviorSubject<T>();
}
Expand All @@ -111,6 +113,7 @@ public static <T> BehaviorSubject<T> create() {
* {@link BehaviorSubject} has not yet observed any items from its source {@code Observable}
* @return the constructed {@link BehaviorSubject}
*/
@CheckReturnValue
public static <T> BehaviorSubject<T> createDefault(T defaultValue) {
return new BehaviorSubject<T>(defaultValue);
}
Expand Down
2 changes: 2 additions & 0 deletions src/main/java/io/reactivex/subjects/PublishSubject.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

package io.reactivex.subjects;

import io.reactivex.annotations.CheckReturnValue;
import java.util.concurrent.atomic.*;

import io.reactivex.Observer;
Expand Down Expand Up @@ -63,6 +64,7 @@ public final class PublishSubject<T> extends Subject<T> {
* @param <T> the value type
* @return the new PublishSubject
*/
@CheckReturnValue
public static <T> PublishSubject<T> create() {
return new PublishSubject<T>();
}
Expand Down
6 changes: 6 additions & 0 deletions src/main/java/io/reactivex/subjects/ReplaySubject.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import io.reactivex.Observer;
import io.reactivex.Scheduler;
import io.reactivex.annotations.CheckReturnValue;
import io.reactivex.disposables.Disposable;
import io.reactivex.internal.functions.ObjectHelper;
import io.reactivex.internal.util.NotificationLite;
Expand Down Expand Up @@ -74,6 +75,7 @@ public final class ReplaySubject<T> extends Subject<T> {
* the type of items observed and emitted by the Subject
* @return the created subject
*/
@CheckReturnValue
public static <T> ReplaySubject<T> create() {
return new ReplaySubject<T>(new UnboundedReplayBuffer<T>(16));
}
Expand All @@ -93,6 +95,7 @@ public static <T> ReplaySubject<T> create() {
* the initial buffer capacity
* @return the created subject
*/
@CheckReturnValue
public static <T> ReplaySubject<T> create(int capacityHint) {
return new ReplaySubject<T>(new UnboundedReplayBuffer<T>(capacityHint));
}
Expand All @@ -117,6 +120,7 @@ public static <T> ReplaySubject<T> create(int capacityHint) {
* the maximum number of buffered items
* @return the created subject
*/
@CheckReturnValue
public static <T> ReplaySubject<T> createWithSize(int maxSize) {
return new ReplaySubject<T>(new SizeBoundReplayBuffer<T>(maxSize));
}
Expand Down Expand Up @@ -170,6 +174,7 @@ public static <T> ReplaySubject<T> createWithSize(int maxSize) {
* the {@link Scheduler} that provides the current time
* @return the created subject
*/
@CheckReturnValue
public static <T> ReplaySubject<T> createWithTime(long maxAge, TimeUnit unit, Scheduler scheduler) {
return new ReplaySubject<T>(new SizeAndTimeBoundReplayBuffer<T>(Integer.MAX_VALUE, maxAge, unit, scheduler));
}
Expand Down Expand Up @@ -208,6 +213,7 @@ public static <T> ReplaySubject<T> createWithTime(long maxAge, TimeUnit unit, Sc
* the {@link Scheduler} that provides the current time
* @return the created subject
*/
@CheckReturnValue
public static <T> ReplaySubject<T> createWithTimeAndSize(long maxAge, TimeUnit unit, Scheduler scheduler, int maxSize) {
return new ReplaySubject<T>(new SizeAndTimeBoundReplayBuffer<T>(maxSize, maxAge, unit, scheduler));
}
Expand Down
4 changes: 4 additions & 0 deletions src/main/java/io/reactivex/subjects/UnicastSubject.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import java.util.concurrent.atomic.*;

import io.reactivex.Observer;
import io.reactivex.annotations.CheckReturnValue;
import io.reactivex.disposables.Disposable;
import io.reactivex.internal.disposables.EmptyDisposable;
import io.reactivex.internal.functions.ObjectHelper;
Expand Down Expand Up @@ -73,6 +74,7 @@ public final class UnicastSubject<T> extends Subject<T> {
* @param <T> the value type
* @return an UnicastSubject instance
*/
@CheckReturnValue
public static <T> UnicastSubject<T> create() {
return new UnicastSubject<T>(bufferSize());
}
Expand All @@ -83,6 +85,7 @@ public static <T> UnicastSubject<T> create() {
* @param capacityHint the hint to size the internal unbounded buffer
* @return an UnicastSubject instance
*/
@CheckReturnValue
public static <T> UnicastSubject<T> create(int capacityHint) {
return new UnicastSubject<T>(capacityHint);
}
Expand All @@ -99,6 +102,7 @@ public static <T> UnicastSubject<T> create(int capacityHint) {
* @param onCancelled the non null callback
* @return an UnicastSubject instance
*/
@CheckReturnValue
public static <T> UnicastSubject<T> create(int capacityHint, Runnable onCancelled) {
return new UnicastSubject<T>(capacityHint, onCancelled);
}
Expand Down