Skip to content

Commit

Permalink
fix reactor#389 - support conditional subscriber in parallelflux (rea…
Browse files Browse the repository at this point in the history
  • Loading branch information
smaldini authored Sep 19, 2017
1 parent f7a2e00 commit c3c1e6e
Show file tree
Hide file tree
Showing 10 changed files with 154 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import javax.annotation.Nullable;

import reactor.core.CoreSubscriber;
import reactor.core.Fuseable;
import reactor.core.Scannable;

/**
Expand Down Expand Up @@ -47,19 +48,28 @@ public Object scanUnsafe(Attr key) {
}

@Override
@SuppressWarnings("unchecked")
public void subscribe(CoreSubscriber<? super T>[] subscribers) {
if (!validate(subscribers)) {
return;
}

int n = subscribers.length;
@SuppressWarnings("unchecked")

CoreSubscriber<? super T>[] parents = new CoreSubscriber[n];


boolean conditional = subscribers[0] instanceof Fuseable.ConditionalSubscriber;

for (int i = 0; i < n; i++) {
parents[i] = new FluxFilter.FilterSubscriber<>(subscribers[i], predicate);
if (conditional) {
parents[i] = new FluxFilter.FilterConditionalSubscriber<>(
(Fuseable.ConditionalSubscriber<T>)subscribers[i], predicate);
}
else {
parents[i] = new FluxFilter.FilterSubscriber<>(subscribers[i], predicate);
}
}

source.subscribe(parents);
}

Expand Down
15 changes: 12 additions & 3 deletions reactor-core/src/main/java/reactor/core/publisher/ParallelLog.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import javax.annotation.Nullable;

import reactor.core.CoreSubscriber;
import reactor.core.Fuseable;
import reactor.core.Scannable;

/**
Expand All @@ -39,17 +40,25 @@ final class ParallelLog<T> extends ParallelFlux<T> implements Scannable {
}

@Override
@SuppressWarnings("unchecked")
public void subscribe(CoreSubscriber<? super T>[] subscribers) {
if (!validate(subscribers)) {
return;
}

int n = subscribers.length;
@SuppressWarnings("unchecked")
CoreSubscriber<? super T>[] parents = new CoreSubscriber[n];


boolean conditional = subscribers[0] instanceof Fuseable.ConditionalSubscriber;

for (int i = 0; i < n; i++) {
parents[i] = new FluxPeek.PeekSubscriber<>(subscribers[i], log);
if (conditional) {
parents[i] = new FluxPeekFuseable.PeekConditionalSubscriber<>(
(Fuseable.ConditionalSubscriber<T>)subscribers[i], log);
}
else {
parents[i] = new FluxPeek.PeekSubscriber<>(subscribers[i], log);
}
}

source.subscribe(parents);
Expand Down
16 changes: 13 additions & 3 deletions reactor-core/src/main/java/reactor/core/publisher/ParallelMap.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import javax.annotation.Nullable;

import reactor.core.CoreSubscriber;
import reactor.core.Fuseable;
import reactor.core.Scannable;

/**
Expand Down Expand Up @@ -48,17 +49,26 @@ public Object scanUnsafe(Attr key) {
}

@Override
@SuppressWarnings("unchecked")
public void subscribe(CoreSubscriber<? super R>[] subscribers) {
if (!validate(subscribers)) {
return;
}

int n = subscribers.length;
@SuppressWarnings("unchecked")
CoreSubscriber<? super T>[] parents = new CoreSubscriber[n];


boolean conditional = subscribers[0] instanceof Fuseable.ConditionalSubscriber;

for (int i = 0; i < n; i++) {
parents[i] = new FluxMap.MapSubscriber<>(subscribers[i], mapper);
if (conditional) {
parents[i] =
new FluxMap.MapConditionalSubscriber<>((Fuseable.ConditionalSubscriber<R>) subscribers[i],
mapper);
}
else {
parents[i] = new FluxMap.MapSubscriber<>(subscribers[i], mapper);
}
}

source.subscribe(parents);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.Fuseable;

/**
* Execute a Consumer in each 'rail' for the current element passing through.
Expand Down Expand Up @@ -63,17 +64,26 @@ final class ParallelPeek<T> extends ParallelFlux<T> implements SignalPeek<T>{
}

@Override
@SuppressWarnings("unchecked")
public void subscribe(CoreSubscriber<? super T>[] subscribers) {
if (!validate(subscribers)) {
return;
}

int n = subscribers.length;
@SuppressWarnings("unchecked")

CoreSubscriber<? super T>[] parents = new CoreSubscriber[n];


boolean conditional = subscribers[0] instanceof Fuseable.ConditionalSubscriber;

for (int i = 0; i < n; i++) {
parents[i] = new FluxPeek.PeekSubscriber<>(subscribers[i], this);
if (conditional) {
parents[i] = new FluxPeekFuseable.PeekConditionalSubscriber<>(
(Fuseable.ConditionalSubscriber<T>)subscribers[i], this);
}
else {
parents[i] = new FluxPeek.PeekSubscriber<>(subscribers[i], this);
}
}

source.subscribe(parents);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,25 +60,33 @@ public Object scanUnsafe(Attr key) {
}

@Override
@SuppressWarnings("unchecked")
public void subscribe(CoreSubscriber<? super T>[] subscribers) {
if (!validate(subscribers)) {
return;
}

int n = subscribers.length;

@SuppressWarnings("unchecked")

CoreSubscriber<T>[] parents = new CoreSubscriber[n];


boolean conditional = subscribers[0] instanceof Fuseable.ConditionalSubscriber;

for (int i = 0; i < n; i++) {
CoreSubscriber<? super T> a = subscribers[i];

Worker w = scheduler.createWorker();

CoreSubscriber<T> parent = new FluxPublishOn.PublishOnSubscriber<>(a,
scheduler, w, true,
prefetch, queueSupplier);
parents[i] = parent;
if (conditional) {
parents[i] = new FluxPublishOn.PublishOnConditionalSubscriber<>(
(Fuseable.ConditionalSubscriber<T>)subscribers[i],
scheduler, w, true,
prefetch, queueSupplier);
}
else {
parents[i] = new FluxPublishOn.PublishOnSubscriber<>(subscribers[i],
scheduler, w, true,
prefetch, queueSupplier);
}
}

source.subscribe(parents);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
Expand All @@ -18,6 +18,8 @@

import org.junit.Test;
import reactor.core.Scannable;
import reactor.core.scheduler.Schedulers;
import reactor.test.StepVerifier;

import static org.assertj.core.api.Assertions.assertThat;

Expand All @@ -44,4 +46,19 @@ public void scanOperator() throws Exception {
.isNotEqualTo(source.getPrefetch());
}

@Test
public void conditional() {
Flux<Integer> source = Flux.range(1, 1_000);
for (int i = 1; i < 33; i++) {
Flux<Integer> result = ParallelFlux.from(source, i)
.filter(t -> true)
.filter(t -> true)
.sequential();

StepVerifier.create(result)
.expectNextCount(1_000)
.verifyComplete();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
Expand All @@ -18,6 +18,8 @@

import org.junit.Test;
import reactor.core.Scannable;
import reactor.core.scheduler.Schedulers;
import reactor.test.StepVerifier;

import static org.assertj.core.api.Assertions.assertThat;

Expand All @@ -44,4 +46,18 @@ public void scanOperator() throws Exception {
.isEqualTo(source.getPrefetch());
}

@Test
public void conditional() {
Flux<Integer> source = Flux.range(1, 1_000);
for (int i = 1; i < 33; i++) {
Flux<Integer> result = ParallelFlux.from(source, i)
.log()
.filter(t -> true)
.sequential();

StepVerifier.create(result)
.expectNextCount(1_000)
.verifyComplete();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
Expand All @@ -18,6 +18,8 @@

import org.junit.Test;
import reactor.core.Scannable;
import reactor.core.scheduler.Schedulers;
import reactor.test.StepVerifier;

import static org.assertj.core.api.Assertions.assertThat;

Expand All @@ -41,4 +43,18 @@ public void scanOperator() {
assertThat(test.scan(Scannable.Attr.PREFETCH)).isEqualTo(-1);
}

@Test
public void conditional() {
Flux<Integer> source = Flux.range(1, 1_000);
for (int i = 1; i < 33; i++) {
Flux<Integer> result = ParallelFlux.from(source, i)
.map(v -> v + 1)
.filter(t -> true)
.sequential();

StepVerifier.create(result)
.expectNextCount(1_000)
.verifyComplete();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
Expand All @@ -18,6 +18,8 @@

import org.junit.Test;
import reactor.core.Scannable;
import reactor.core.scheduler.Schedulers;
import reactor.test.StepVerifier;
import reactor.util.concurrent.Queues;

import static org.assertj.core.api.Assertions.assertThat;
Expand Down Expand Up @@ -45,4 +47,19 @@ public void scanOperator() {
.isEqualTo(test.getPrefetch())
.isEqualTo(123);
}

@Test
public void conditional() {
Flux<Integer> source = Flux.range(1, 1_000);
for (int i = 1; i < 33; i++) {
Flux<Integer> result = ParallelFlux.from(source, i)
.doOnNext(d -> {})
.filter(t -> true)
.sequential();

StepVerifier.create(result)
.expectNextCount(1_000)
.verifyComplete();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
Expand All @@ -19,6 +19,7 @@
import org.junit.Test;
import reactor.core.Scannable;
import reactor.core.scheduler.Schedulers;
import reactor.test.StepVerifier;
import reactor.util.concurrent.Queues;

import static org.assertj.core.api.Assertions.assertThat;
Expand All @@ -42,4 +43,18 @@ public void parallelism() {
assertThat(test.parallelism()).isEqualTo(2);
}

@Test
public void conditional() {
Flux<Integer> source = Flux.range(1, 1_000);
for (int i = 1; i < 33; i++) {
Flux<Integer> result = ParallelFlux.from(source, i)
.runOn(Schedulers.parallel())
.filter(t -> true)
.sequential();

StepVerifier.create(result)
.expectNextCount(1_000)
.verifyComplete();
}
}
}

0 comments on commit c3c1e6e

Please sign in to comment.