Skip to content

Commit

Permalink
Cleanup of Eclipse warnings
Browse files Browse the repository at this point in the history
  • Loading branch information
akarnokd committed Jul 12, 2016
1 parent c6a7fa3 commit eafeba3
Show file tree
Hide file tree
Showing 13 changed files with 53 additions and 51 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,6 @@ out
*.data
classes
exportToHtml
.pmd
.ruleset
.rulesets
56 changes: 31 additions & 25 deletions src/main/java/reactor/core/adapter/JdkFlowAdapter.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public static <T> Flow.Publisher<T> publisherToFlowPublisher(final Publisher<T>
return new PublisherAsFlowPublisher<>(publisher);
}

public static <T> Flux flowPublisherToPublisher(Flow.Publisher<T> publisher) {
public static <T> Flux<T> flowPublisherToPublisher(Flow.Publisher<T> publisher) {
return new FlowPublisherAsFlux<>(publisher);
}

Expand Down Expand Up @@ -68,27 +68,20 @@ public void subscribe(Flow.Subscriber<? super T> subscriber) {
}
}

private static class FlowSubscriber<T> implements Subscriber<T> {
private static class FlowSubscriber<T> implements Subscriber<T>, Flow.Subscription {

private final Flow.Subscriber<? super T> subscriber;

Subscription subscription;

public FlowSubscriber(Flow.Subscriber<? super T> subscriber) {
this.subscriber = subscriber;
}

@Override
public void onSubscribe(final Subscription s) {
subscriber.onSubscribe(new Flow.Subscription() {
@Override
public void request(long l) {
s.request(l);
}

@Override
public void cancel() {
s.cancel();
}
});
this.subscription = s;
subscriber.onSubscribe(this);
}

@Override
Expand All @@ -105,29 +98,32 @@ public void onError(Throwable t) {
public void onComplete() {
subscriber.onComplete();
}

@Override
public void request(long n) {
subscription.request(n);
}

@Override
public void cancel() {
subscription.cancel();
}
}

private static class SubscriberToRS<T> implements Flow.Subscriber<T> {
private static class SubscriberToRS<T> implements Flow.Subscriber<T>, Subscription {

private final Subscriber<? super T> s;

Flow.Subscription subscription;

public SubscriberToRS(Subscriber<? super T> s) {
this.s = s;
}

@Override
public void onSubscribe(final Flow.Subscription subscription) {
s.onSubscribe(new Subscription() {
@Override
public void request(long n) {
subscription.request(n);
}

@Override
public void cancel() {
subscription.cancel();
}
});
this.subscription = subscription;;
s.onSubscribe(this);
}

@Override
Expand All @@ -144,5 +140,15 @@ public void onError(Throwable throwable) {
public void onComplete() {
s.onComplete();
}

@Override
public void request(long n) {
subscription.request(n);
}

@Override
public void cancel() {
subscription.cancel();
}
}
}
15 changes: 7 additions & 8 deletions src/main/java/reactor/core/adapter/RxJava1Adapter.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public enum RxJava1Adapter {
/**
* @param completable
*
* @return
* @return a new Mono instance
*/
public static Mono<Void> completableToMono(rx.Completable completable) {
return new CompletableAsMono(completable);
Expand All @@ -59,9 +59,8 @@ public static Mono<Void> completableToMono(rx.Completable completable) {
*
* @param obs
* @param <T>
* @return
* @return a new Flux instance
*/
@SuppressWarnings("unchecked")
public static <T> Flux<T> observableToFlux(Observable<T> obs) {
if (obs == Observable.empty()) {
return Flux.empty();
Expand All @@ -77,7 +76,7 @@ public static <T> Flux<T> observableToFlux(Observable<T> obs) {
*
* @param publisher
* @param <T>
* @return
* @return a new Observable instance
*/
@SuppressWarnings("unchecked")
public static <T> Observable<T> publisherToObservable(final Publisher<T> publisher) {
Expand All @@ -97,7 +96,7 @@ public static <T> Observable<T> publisherToObservable(final Publisher<T> publish
*
* @param publisher
* @param <T>
* @return
* @return a new Single instance
*/
@SuppressWarnings("unchecked")
public static <T> rx.Single<T> publisherToSingle(Publisher<T> publisher) {
Expand All @@ -117,7 +116,7 @@ public static <T> rx.Single<T> publisherToSingle(Publisher<T> publisher) {
/**
* @param source
*
* @return
* @return a new Completable instance
*/
public static rx.Completable publisherToCompletable(Publisher<?> source) {
return rx.Completable.create(new PublisherAsCompletable(source));
Expand All @@ -127,7 +126,7 @@ public static rx.Completable publisherToCompletable(Publisher<?> source) {
* @param single
* @param <T>
*
* @return
* @return a new Mono instance
*/
public static <T> Mono<T> singleToMono(rx.Single<T> single) {
return new SingleAsMono<>(single);
Expand Down Expand Up @@ -448,7 +447,7 @@ else if (s == HAS_REQUEST_NO_VALUE) {
}
else {
this.value = value;
if (STATE.compareAndSet(this, s, NO_REQUEST_HAS_VALUE)) {
if (STATE.compareAndSet(this, NO_REQUEST_NO_VALUE, NO_REQUEST_HAS_VALUE)) {
break;
}
}
Expand Down
2 changes: 2 additions & 0 deletions src/main/java/reactor/core/publisher/Flux.java
Original file line number Diff line number Diff line change
Expand Up @@ -2302,6 +2302,7 @@ public final Flux<T> delayMillis(long delay) {
* <img class="marble" src="https://raw.githubusercontent.com/reactor/projectreactor.io/master/src/main/static/assets/img/marble/delayonnext.png" alt="">
*
* @param delay period to delay each {@link Subscriber#onNext} call in milliseconds
* @param timer the timed scheduler to use for delaying each signal
*
* @return a throttled {@link Flux}
*
Expand Down Expand Up @@ -4419,6 +4420,7 @@ public final Cancellation subscribe(Consumer<? super T> consumer) {
* alt="">
*
* @param consumer the consumer to invoke on each value
* @param prefetch the the prefetch amount, positive
*
* @return a new {@link Cancellation} to dispose the {@link Subscription}
*/
Expand Down
2 changes: 0 additions & 2 deletions src/main/java/reactor/core/publisher/FluxSource.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@
import org.reactivestreams.Subscriber;
import reactor.core.flow.Fuseable;
import reactor.core.flow.Receiver;
import reactor.core.state.Backpressurable;
import reactor.core.state.Introspectable;

/**
* A connecting {@link Flux} Publisher (right-to-left from a composition chain perspective)
Expand Down
1 change: 0 additions & 1 deletion src/main/java/reactor/core/publisher/MonoSource.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.reactivestreams.Subscriber;
import reactor.core.flow.Fuseable;
import reactor.core.flow.Receiver;
import reactor.core.state.Introspectable;
import reactor.core.util.Exceptions;

/**
Expand Down
3 changes: 1 addition & 2 deletions src/main/java/reactor/core/publisher/ParallelFlux.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.flow.Cancellation;
import reactor.core.queue.QueueSupplier;
import reactor.core.scheduler.Scheduler;
import reactor.core.subscriber.LambdaSubscriber;
Expand Down Expand Up @@ -802,7 +801,7 @@ public void subscribe(Consumer<? super T> onNext, Consumer<? super Throwable>
onError, Runnable onComplete){

@SuppressWarnings("unchecked")
Subscriber<T>[] subscribers = (Subscriber<T>[])new Subscriber[parallelism()];
Subscriber<T>[] subscribers = new Subscriber[parallelism()];

int i = 0;
while(i < subscribers.length){
Expand Down
3 changes: 2 additions & 1 deletion src/main/java/reactor/core/tuple/Tuple.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ public abstract class Tuple implements Iterable, Serializable, Function {

static final long serialVersionUID = 8777121294502020843L;
static final Object[] emptyArray = new Object[0];
static final Tuple empty = new Tuple(0){};
@SuppressWarnings("serial")
static final Tuple empty = new Tuple(0){};


protected final int size;
Expand Down
2 changes: 0 additions & 2 deletions src/test/java/reactor/core/adapter/RxJavaPublisherTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ public long maxElementsFromPublisher() {
}

@Override
@SuppressWarnings("unchecked")
public Publisher<Long> createPublisher(long elements) {
return RxJava1Adapter.observableToFlux(Observable.range
(0,
Expand All @@ -51,7 +50,6 @@ public Publisher<Long> createPublisher(long elements) {
}

@Override
@SuppressWarnings("unchecked")
public Publisher<Long> createFailedPublisher() {
return RxJava1Adapter.observableToFlux(Observable.error(new Exception
("obs-test"))).cast(Long.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,11 @@ public long maxElementsFromPublisher() {
}

@Override
@SuppressWarnings("unchecked")
public Publisher<Long> createPublisher(long elements) {
return RxJava1Adapter.singleToMono(Single.just(0)).cast(Long.class);
}

@Override
@SuppressWarnings("unchecked")
public Publisher<Long> createFailedPublisher() {
return RxJava1Adapter.singleToMono(Single.error(new Exception("single-test")));
}
Expand Down
1 change: 0 additions & 1 deletion src/test/java/reactor/core/publisher/ParallelFluxTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import java.util.List;
import java.util.function.Supplier;

import org.junit.Assert;
import org.junit.Test;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public class FizzBuzzTests extends AbstractReactorTest {
@Test
public void fizzTest() throws Throwable {
int numOfItems = 1024;
int batchSize = 8;
// int batchSize = 8;
final Timer timer = new Timer();
AtomicLong globalCounter = new AtomicLong();

Expand Down
12 changes: 6 additions & 6 deletions src/test/java/reactor/core/publisher/scenarios/FluxTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ public void mapManyFlushesAllValuesThoroughly() throws InterruptedException {
return Integer.parseInt(str);
}));

Cancellation tail = tasks.subscribe(i -> {
/* Cancellation tail =*/ tasks.subscribe(i -> {
latch.countDown();
});

Expand Down Expand Up @@ -386,7 +386,7 @@ public void mapNotifiesOnce() throws InterruptedException {
EmitterProcessor<Integer> d = EmitterProcessor.create();
SubmissionEmitter<Integer> s = SubmissionEmitter.create(d);

Cancellation c = d.publishOn(asyncGroup)
/*Cancellation c = */d.publishOn(asyncGroup)
.parallel(8)
.groups()
.subscribe(stream -> stream.publishOn(asyncGroup)
Expand Down Expand Up @@ -815,7 +815,7 @@ public void shouldCorrectlyDispatchComplexFlow() throws InterruptedException {

);

Cancellation action = s
/*Cancellation action = */s
.subscribe(integer -> {
latch.countDown();
System.out.println(integer);
Expand Down Expand Up @@ -893,7 +893,7 @@ public void testBeyondLongMaxMicroBatching() throws InterruptedException {
.log("before")
.publishOn(asyncGroup);

Cancellation tail = worker.log("after")
/*Cancellation tail = */worker.log("after")
.parallel(2)
.groups()
.subscribe(s -> s.log("w"+s.key())
Expand Down Expand Up @@ -1022,7 +1022,7 @@ public void shouldWindowCorrectly() throws InterruptedException {

CountDownLatch endLatch = new CountDownLatch(1000 / 100);

Cancellation controls = sensorDataStream
/*Cancellation controls = */sensorDataStream
/* step 2 */.window(100)
///* step 3 */.timeout(1000)
/* step 4 */
Expand Down Expand Up @@ -1094,7 +1094,7 @@ public void consistentMultithreadingWithPartition() throws InterruptedException

CountDownLatch latch = new CountDownLatch(10);

Cancellation c = Flux.range(1, 10)
/*Cancellation c = */Flux.range(1, 10)
.groupBy(n -> n % 2 == 0)
.flatMap(stream -> stream.publishOn(supplier1)
.log("groupBy-" + stream.key()))
Expand Down

0 comments on commit eafeba3

Please sign in to comment.