diff --git a/.gitignore b/.gitignore index 54bc2531f6..acb3f2ec9c 100644 --- a/.gitignore +++ b/.gitignore @@ -14,3 +14,6 @@ out *.data classes exportToHtml +.pmd +.ruleset +.rulesets \ No newline at end of file diff --git a/src/main/java/reactor/core/adapter/JdkFlowAdapter.java b/src/main/java/reactor/core/adapter/JdkFlowAdapter.java index d91f090f63..5c1ebbc1b7 100644 --- a/src/main/java/reactor/core/adapter/JdkFlowAdapter.java +++ b/src/main/java/reactor/core/adapter/JdkFlowAdapter.java @@ -38,7 +38,7 @@ public static Flow.Publisher publisherToFlowPublisher(final Publisher return new PublisherAsFlowPublisher<>(publisher); } - public static Flux flowPublisherToPublisher(Flow.Publisher publisher) { + public static Flux flowPublisherToPublisher(Flow.Publisher publisher) { return new FlowPublisherAsFlux<>(publisher); } @@ -68,9 +68,11 @@ public void subscribe(Flow.Subscriber subscriber) { } } - private static class FlowSubscriber implements Subscriber { + private static class FlowSubscriber implements Subscriber, Flow.Subscription { private final Flow.Subscriber subscriber; + + Subscription subscription; public FlowSubscriber(Flow.Subscriber subscriber) { this.subscriber = subscriber; @@ -78,17 +80,8 @@ public FlowSubscriber(Flow.Subscriber subscriber) { @Override public void onSubscribe(final Subscription s) { - subscriber.onSubscribe(new Flow.Subscription() { - @Override - public void request(long l) { - s.request(l); - } - - @Override - public void cancel() { - s.cancel(); - } - }); + this.subscription = s; + subscriber.onSubscribe(this); } @Override @@ -105,11 +98,23 @@ public void onError(Throwable t) { public void onComplete() { subscriber.onComplete(); } + + @Override + public void request(long n) { + subscription.request(n); + } + + @Override + public void cancel() { + subscription.cancel(); + } } - private static class SubscriberToRS implements Flow.Subscriber { + private static class SubscriberToRS implements Flow.Subscriber, Subscription { private final Subscriber s; + + Flow.Subscription subscription; public SubscriberToRS(Subscriber s) { this.s = s; @@ -117,17 +122,8 @@ public SubscriberToRS(Subscriber s) { @Override public void onSubscribe(final Flow.Subscription subscription) { - s.onSubscribe(new Subscription() { - @Override - public void request(long n) { - subscription.request(n); - } - - @Override - public void cancel() { - subscription.cancel(); - } - }); + this.subscription = subscription;; + s.onSubscribe(this); } @Override @@ -144,5 +140,15 @@ public void onError(Throwable throwable) { public void onComplete() { s.onComplete(); } + + @Override + public void request(long n) { + subscription.request(n); + } + + @Override + public void cancel() { + subscription.cancel(); + } } } diff --git a/src/main/java/reactor/core/adapter/RxJava1Adapter.java b/src/main/java/reactor/core/adapter/RxJava1Adapter.java index 3283e45704..a467e19110 100644 --- a/src/main/java/reactor/core/adapter/RxJava1Adapter.java +++ b/src/main/java/reactor/core/adapter/RxJava1Adapter.java @@ -49,7 +49,7 @@ public enum RxJava1Adapter { /** * @param completable * - * @return + * @return a new Mono instance */ public static Mono completableToMono(rx.Completable completable) { return new CompletableAsMono(completable); @@ -59,9 +59,8 @@ public static Mono completableToMono(rx.Completable completable) { * * @param obs * @param - * @return + * @return a new Flux instance */ - @SuppressWarnings("unchecked") public static Flux observableToFlux(Observable obs) { if (obs == Observable.empty()) { return Flux.empty(); @@ -77,7 +76,7 @@ public static Flux observableToFlux(Observable obs) { * * @param publisher * @param - * @return + * @return a new Observable instance */ @SuppressWarnings("unchecked") public static Observable publisherToObservable(final Publisher publisher) { @@ -97,7 +96,7 @@ public static Observable publisherToObservable(final Publisher publish * * @param publisher * @param - * @return + * @return a new Single instance */ @SuppressWarnings("unchecked") public static rx.Single publisherToSingle(Publisher publisher) { @@ -117,7 +116,7 @@ public static rx.Single publisherToSingle(Publisher publisher) { /** * @param source * - * @return + * @return a new Completable instance */ public static rx.Completable publisherToCompletable(Publisher source) { return rx.Completable.create(new PublisherAsCompletable(source)); @@ -127,7 +126,7 @@ public static rx.Completable publisherToCompletable(Publisher source) { * @param single * @param * - * @return + * @return a new Mono instance */ public static Mono singleToMono(rx.Single single) { return new SingleAsMono<>(single); @@ -448,7 +447,7 @@ else if (s == HAS_REQUEST_NO_VALUE) { } else { this.value = value; - if (STATE.compareAndSet(this, s, NO_REQUEST_HAS_VALUE)) { + if (STATE.compareAndSet(this, NO_REQUEST_NO_VALUE, NO_REQUEST_HAS_VALUE)) { break; } } diff --git a/src/main/java/reactor/core/publisher/Flux.java b/src/main/java/reactor/core/publisher/Flux.java index 1eda9308e7..fdbe6dc3f8 100644 --- a/src/main/java/reactor/core/publisher/Flux.java +++ b/src/main/java/reactor/core/publisher/Flux.java @@ -2302,6 +2302,7 @@ public final Flux delayMillis(long delay) { * * * @param delay period to delay each {@link Subscriber#onNext} call in milliseconds + * @param timer the timed scheduler to use for delaying each signal * * @return a throttled {@link Flux} * @@ -4419,6 +4420,7 @@ public final Cancellation subscribe(Consumer consumer) { * alt=""> * * @param consumer the consumer to invoke on each value + * @param prefetch the the prefetch amount, positive * * @return a new {@link Cancellation} to dispose the {@link Subscription} */ diff --git a/src/main/java/reactor/core/publisher/FluxSource.java b/src/main/java/reactor/core/publisher/FluxSource.java index 0ae74fc51a..ed84031180 100644 --- a/src/main/java/reactor/core/publisher/FluxSource.java +++ b/src/main/java/reactor/core/publisher/FluxSource.java @@ -21,8 +21,6 @@ import org.reactivestreams.Subscriber; import reactor.core.flow.Fuseable; import reactor.core.flow.Receiver; -import reactor.core.state.Backpressurable; -import reactor.core.state.Introspectable; /** * A connecting {@link Flux} Publisher (right-to-left from a composition chain perspective) diff --git a/src/main/java/reactor/core/publisher/MonoSource.java b/src/main/java/reactor/core/publisher/MonoSource.java index 52aebef887..984135d663 100644 --- a/src/main/java/reactor/core/publisher/MonoSource.java +++ b/src/main/java/reactor/core/publisher/MonoSource.java @@ -21,7 +21,6 @@ import org.reactivestreams.Subscriber; import reactor.core.flow.Fuseable; import reactor.core.flow.Receiver; -import reactor.core.state.Introspectable; import reactor.core.util.Exceptions; /** diff --git a/src/main/java/reactor/core/publisher/ParallelFlux.java b/src/main/java/reactor/core/publisher/ParallelFlux.java index e063fd5ea7..60ffdd48a1 100644 --- a/src/main/java/reactor/core/publisher/ParallelFlux.java +++ b/src/main/java/reactor/core/publisher/ParallelFlux.java @@ -33,7 +33,6 @@ import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; -import reactor.core.flow.Cancellation; import reactor.core.queue.QueueSupplier; import reactor.core.scheduler.Scheduler; import reactor.core.subscriber.LambdaSubscriber; @@ -802,7 +801,7 @@ public void subscribe(Consumer onNext, Consumer onError, Runnable onComplete){ @SuppressWarnings("unchecked") - Subscriber[] subscribers = (Subscriber[])new Subscriber[parallelism()]; + Subscriber[] subscribers = new Subscriber[parallelism()]; int i = 0; while(i < subscribers.length){ diff --git a/src/main/java/reactor/core/tuple/Tuple.java b/src/main/java/reactor/core/tuple/Tuple.java index a1e22df35c..5c3c7daef2 100644 --- a/src/main/java/reactor/core/tuple/Tuple.java +++ b/src/main/java/reactor/core/tuple/Tuple.java @@ -36,7 +36,8 @@ public abstract class Tuple implements Iterable, Serializable, Function { static final long serialVersionUID = 8777121294502020843L; static final Object[] emptyArray = new Object[0]; - static final Tuple empty = new Tuple(0){}; + @SuppressWarnings("serial") + static final Tuple empty = new Tuple(0){}; protected final int size; diff --git a/src/test/java/reactor/core/adapter/RxJavaPublisherTests.java b/src/test/java/reactor/core/adapter/RxJavaPublisherTests.java index 94a10f4921..851104decb 100644 --- a/src/test/java/reactor/core/adapter/RxJavaPublisherTests.java +++ b/src/test/java/reactor/core/adapter/RxJavaPublisherTests.java @@ -42,7 +42,6 @@ public long maxElementsFromPublisher() { } @Override - @SuppressWarnings("unchecked") public Publisher createPublisher(long elements) { return RxJava1Adapter.observableToFlux(Observable.range (0, @@ -51,7 +50,6 @@ public Publisher createPublisher(long elements) { } @Override - @SuppressWarnings("unchecked") public Publisher createFailedPublisher() { return RxJava1Adapter.observableToFlux(Observable.error(new Exception ("obs-test"))).cast(Long.class); diff --git a/src/test/java/reactor/core/adapter/RxJavaSinglePublisherTests.java b/src/test/java/reactor/core/adapter/RxJavaSinglePublisherTests.java index e1cf80afee..54ccd6c4da 100644 --- a/src/test/java/reactor/core/adapter/RxJavaSinglePublisherTests.java +++ b/src/test/java/reactor/core/adapter/RxJavaSinglePublisherTests.java @@ -42,13 +42,11 @@ public long maxElementsFromPublisher() { } @Override - @SuppressWarnings("unchecked") public Publisher createPublisher(long elements) { return RxJava1Adapter.singleToMono(Single.just(0)).cast(Long.class); } @Override - @SuppressWarnings("unchecked") public Publisher createFailedPublisher() { return RxJava1Adapter.singleToMono(Single.error(new Exception("single-test"))); } diff --git a/src/test/java/reactor/core/publisher/ParallelFluxTest.java b/src/test/java/reactor/core/publisher/ParallelFluxTest.java index 448a1e00dd..dae145e041 100644 --- a/src/test/java/reactor/core/publisher/ParallelFluxTest.java +++ b/src/test/java/reactor/core/publisher/ParallelFluxTest.java @@ -24,7 +24,6 @@ import java.util.List; import java.util.function.Supplier; -import org.junit.Assert; import org.junit.Test; import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Schedulers; diff --git a/src/test/java/reactor/core/publisher/scenarios/FizzBuzzTests.java b/src/test/java/reactor/core/publisher/scenarios/FizzBuzzTests.java index 2d94b78b90..d9a42778c5 100644 --- a/src/test/java/reactor/core/publisher/scenarios/FizzBuzzTests.java +++ b/src/test/java/reactor/core/publisher/scenarios/FizzBuzzTests.java @@ -40,7 +40,7 @@ public class FizzBuzzTests extends AbstractReactorTest { @Test public void fizzTest() throws Throwable { int numOfItems = 1024; - int batchSize = 8; +// int batchSize = 8; final Timer timer = new Timer(); AtomicLong globalCounter = new AtomicLong(); diff --git a/src/test/java/reactor/core/publisher/scenarios/FluxTests.java b/src/test/java/reactor/core/publisher/scenarios/FluxTests.java index 6c8424d3ff..c51683deae 100644 --- a/src/test/java/reactor/core/publisher/scenarios/FluxTests.java +++ b/src/test/java/reactor/core/publisher/scenarios/FluxTests.java @@ -297,7 +297,7 @@ public void mapManyFlushesAllValuesThoroughly() throws InterruptedException { return Integer.parseInt(str); })); - Cancellation tail = tasks.subscribe(i -> { + /* Cancellation tail =*/ tasks.subscribe(i -> { latch.countDown(); }); @@ -386,7 +386,7 @@ public void mapNotifiesOnce() throws InterruptedException { EmitterProcessor d = EmitterProcessor.create(); SubmissionEmitter s = SubmissionEmitter.create(d); - Cancellation c = d.publishOn(asyncGroup) + /*Cancellation c = */d.publishOn(asyncGroup) .parallel(8) .groups() .subscribe(stream -> stream.publishOn(asyncGroup) @@ -815,7 +815,7 @@ public void shouldCorrectlyDispatchComplexFlow() throws InterruptedException { ); - Cancellation action = s + /*Cancellation action = */s .subscribe(integer -> { latch.countDown(); System.out.println(integer); @@ -893,7 +893,7 @@ public void testBeyondLongMaxMicroBatching() throws InterruptedException { .log("before") .publishOn(asyncGroup); - Cancellation tail = worker.log("after") + /*Cancellation tail = */worker.log("after") .parallel(2) .groups() .subscribe(s -> s.log("w"+s.key()) @@ -1022,7 +1022,7 @@ public void shouldWindowCorrectly() throws InterruptedException { CountDownLatch endLatch = new CountDownLatch(1000 / 100); - Cancellation controls = sensorDataStream + /*Cancellation controls = */sensorDataStream /* step 2 */.window(100) ///* step 3 */.timeout(1000) /* step 4 */ @@ -1094,7 +1094,7 @@ public void consistentMultithreadingWithPartition() throws InterruptedException CountDownLatch latch = new CountDownLatch(10); - Cancellation c = Flux.range(1, 10) + /*Cancellation c = */Flux.range(1, 10) .groupBy(n -> n % 2 == 0) .flatMap(stream -> stream.publishOn(supplier1) .log("groupBy-" + stream.key()))