Skip to content

Commit

Permalink
TestPublisher.createNoncompliant().mono() drops violations (reactor#1244
Browse files Browse the repository at this point in the history
)

This commit prevents the TestPublisher with violations from hiding the
violations when using the `mono()` method, by calling Mono.fromDirect,
which doesn't provide any protection against malformed sources.
  • Loading branch information
simonbasle authored Jun 15, 2018
1 parent d166557 commit a879741
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,12 @@ public boolean wasRequested() {

@Override
public Mono<T> mono() {
return Mono.from(this);
if (violations.isEmpty()) {
return Mono.from(this);
}
else {
return Mono.fromDirect(this);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,12 +111,8 @@ public void onComplete() {
assertThat(count.get()).isEqualTo(1);
}

@Test
public void misbehavingAllowsMultipleTerminations() {
TestPublisher<String> publisher = TestPublisher.createNoncompliant(Violation.CLEANUP_ON_TERMINATE);
AtomicLong count = new AtomicLong();

Subscriber<String> subscriber = new CoreSubscriber<String>() {
private Subscriber<String> countingSubscriber(AtomicLong count) {
return new CoreSubscriber<String>() {
@Override
public void onSubscribe(Subscription s) {
s.request(Long.MAX_VALUE);
Expand All @@ -136,6 +132,13 @@ public void onComplete() {
count.incrementAndGet();
}
};
}

@Test
public void misbehavingAllowsMultipleTerminations() {
TestPublisher<String> publisher = TestPublisher.createNoncompliant(Violation.CLEANUP_ON_TERMINATE);
AtomicLong count = new AtomicLong();
Subscriber<String> subscriber = countingSubscriber(count);

publisher.subscribe(subscriber);

Expand All @@ -148,6 +151,41 @@ public void onComplete() {
publisher.assertCancelled();
}

@Test
public void misbehavingMonoAllowsMultipleTerminations() {
TestPublisher<String> publisher = TestPublisher.createNoncompliant(Violation.CLEANUP_ON_TERMINATE);
AtomicLong count = new AtomicLong();
Subscriber<String> subscriber = countingSubscriber(count);

publisher.mono().subscribe(subscriber);

publisher.error(new IllegalStateException("boom"))
.complete();

publisher.emit("A", "B", "C");

assertThat(count.get()).isEqualTo(3);
publisher.assertCancelled();
}

@Test
public void misbehavingFluxAllowsMultipleTerminations() {
TestPublisher<String> publisher = TestPublisher.createNoncompliant(Violation.CLEANUP_ON_TERMINATE);
AtomicLong count = new AtomicLong();

Subscriber<String> subscriber = countingSubscriber(count);

publisher.flux().subscribe(subscriber);

publisher.error(new IllegalStateException("boom"))
.complete();

publisher.emit("A", "B", "C");

assertThat(count.get()).isEqualTo(3);
publisher.assertCancelled();
}

@Test
public void expectSubscribers() {
TestPublisher<String> publisher = TestPublisher.create();
Expand Down

0 comments on commit a879741

Please sign in to comment.