Skip to content

Commit

Permalink
Avoid finalize in test, add MemoryUtils, alt build on JDK11 (reactor#…
Browse files Browse the repository at this point in the history
…1407)

GC1 pause tuning to attempt to prevent GC-related tests failing

Bump to Gradle 4.10.2 and bump tools for Java 11
 - Mockito
 - Jacoco
 - AssertJ
 - JMH plugin

Travis build on JDK11 instead of 9

Avoid using finalize() in tests but use a new util that
uses PhantomReferences.
  • Loading branch information
simonbasle authored Oct 25, 2018
1 parent e60c3e5 commit 74177c5
Show file tree
Hide file tree
Showing 10 changed files with 125 additions and 62 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
language: java
jdk:
- oraclejdk8
- oraclejdk9
- openjdk11

sudo: required

Expand Down
14 changes: 10 additions & 4 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ buildscript {

plugins {
id 'org.asciidoctor.convert' version '1.5.6'
id "me.champeau.gradle.jmh" version "0.4.4" apply false
id "me.champeau.gradle.jmh" version "0.4.7" apply false
id "org.jetbrains.dokka" version "0.9.16"
id "me.champeau.gradle.japicmp" version "0.2.6"
}
Expand All @@ -61,8 +61,8 @@ ext {
logbackVersion = '1.1.2'

// Testing
assertJVersion = '3.9.0'
mockitoVersion = '2.10.0'
assertJVersion = '3.11.1'
mockitoVersion = '2.23.0'
jUnitParamsVersion = '1.1.1'

javadocLinks = ["https://docs.oracle.com/javase/8/docs/api/",
Expand Down Expand Up @@ -115,7 +115,7 @@ configure(subprojects) { p ->
}

jacoco {
toolVersion = '0.7.9'
toolVersion = '0.8.2'
}

jacocoTestReport {
Expand Down Expand Up @@ -191,6 +191,12 @@ configure(subprojects) { p ->
stackTraceFilters "ENTRY_POINT"
maxGranularity 3
}

if (JavaVersion.current().isJava9Compatible()) {
println "Java 9+: lowering MaxGCPauseMillis to 20ms in ${project.name} ${name}"
jvmArgs = ["-XX:MaxGCPauseMillis=20"]
}

systemProperty("java.awt.headless", "true")
systemProperty("reactor.trace.cancel", "true")
systemProperty("reactor.trace.nocapacity", "true")
Expand Down
4 changes: 2 additions & 2 deletions gradle/setup.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@
* limitations under the License.
*/

task wrapper(type: Wrapper, description: "Create a Gradle self-download wrapper") {
wrapper {
group = 'Project Setup'
gradleVersion = "4.2.1"
gradleVersion = "4.10.2"
}

configure(subprojects) { p ->
Expand Down
Binary file modified gradle/wrapper/gradle-wrapper.jar
Binary file not shown.
2 changes: 1 addition & 1 deletion gradle/wrapper/gradle-wrapper.properties
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@ distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-4.2.1-bin.zip
distributionUrl=https\://services.gradle.org/distributions/gradle-4.10.2-bin.zip
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import reactor.core.CoreSubscriber;
import reactor.core.Scannable;
import reactor.core.publisher.FluxBufferWhen.BufferWhenMainSubscriber;
import reactor.test.MemoryUtils;
import reactor.test.StepVerifier;
import reactor.test.publisher.TestPublisher;
import reactor.test.subscriber.AssertSubscriber;
Expand Down Expand Up @@ -95,7 +96,7 @@ public void bufferedCanCompleteIfOpenNeverCompletesOverlapping() {
@Test
public void timedOutBuffersDontLeak() throws InterruptedException {
LongAdder created = new LongAdder();
LongAdder finalized = new LongAdder();
MemoryUtils.RetainedDetector retainedDetector = new MemoryUtils.RetainedDetector();
class Wrapper {

final int i;
Expand All @@ -109,19 +110,14 @@ class Wrapper {
public String toString() {
return "{i=" + i + '}';
}

@Override
protected void finalize() {
finalized.increment();
}
}

final CountDownLatch latch = new CountDownLatch(1);
final UnicastProcessor<Wrapper> processor = UnicastProcessor.create();

Flux<Integer> emitter = Flux.range(1, 200)
.delayElements(Duration.ofMillis(25))
.doOnNext(i -> processor.onNext(new Wrapper(i)))
.doOnNext(i -> processor.onNext(retainedDetector.tracked(new Wrapper(i))))
.doOnError(processor::onError)
.doOnComplete(processor::onComplete);

Expand All @@ -134,7 +130,7 @@ protected void finalize() {
.map(t2 -> Tuples.of(t2.getT1(),
String.format("from %s to %s", t2.getT2().get(0),
t2.getT2().get(t2.getT2().size() - 1)),
finalized.longValue()))
retainedDetector.finalizedCount()))
.doOnNext(v -> LOGGER.debug(v.toString()))
.doOnComplete(latch::countDown)
.collectList();
Expand All @@ -153,7 +149,7 @@ protected void finalize() {
System.gc();
Thread.sleep(500);

assertThat(finalized.longValue())
assertThat(retainedDetector.finalizedCount())
.as("final GC collects all")
.isEqualTo(created.longValue());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
import reactor.core.CoreSubscriber;
import reactor.core.Fuseable;
import reactor.core.Scannable;
import reactor.test.MemoryUtils;
import reactor.test.MemoryUtils.RetainedDetector;
import reactor.test.MockUtils;
import reactor.test.StepVerifier;
import reactor.test.publisher.FluxOperatorTest;
Expand Down Expand Up @@ -604,9 +606,9 @@ public int hashCode() {

@Test
public void distinctDefaultDoesntRetainObjects() throws InterruptedException {
DistinctDefault.finalized.set(0);
RetainedDetector retainedDetector = new RetainedDetector();
Flux<DistinctDefault> test = Flux.range(1, 100)
.map(DistinctDefault::new)
.map(i -> retainedDetector.tracked(new DistinctDefault(i)))
.distinct();

StepVerifier.create(test)
Expand All @@ -616,16 +618,16 @@ public void distinctDefaultDoesntRetainObjects() throws InterruptedException {
System.gc();
Thread.sleep(500);

assertThat(DistinctDefault.finalized.longValue())
assertThat(retainedDetector.finalizedCount())
.as("none retained")
.isEqualTo(100);
}

@Test
public void distinctDefaultErrorDoesntRetainObjects() throws InterruptedException {
DistinctDefaultError.finalized.set(0);
RetainedDetector retainedDetector = new RetainedDetector();
Flux<DistinctDefaultError> test = Flux.range(1, 100)
.map(DistinctDefaultError::new)
.map(i -> retainedDetector.tracked(new DistinctDefaultError(i)))
.concatWith(Mono.error(new IllegalStateException("boom")))
.distinct();

Expand All @@ -636,16 +638,16 @@ public void distinctDefaultErrorDoesntRetainObjects() throws InterruptedExceptio
System.gc();
Thread.sleep(500);

assertThat(DistinctDefaultError.finalized.longValue())
assertThat(retainedDetector.finalizedCount())
.as("none retained after error")
.isEqualTo(100);
}

@Test
public void distinctDefaultCancelDoesntRetainObjects() throws InterruptedException {
DistinctDefaultCancel.finalized.set(0);
RetainedDetector retainedDetector = new RetainedDetector();
Flux<DistinctDefaultCancel> test = Flux.range(1, 100)
.map(DistinctDefaultCancel::new)
.map(i -> retainedDetector.tracked(new DistinctDefaultCancel(i)))
.concatWith(Mono.error(new IllegalStateException("boom")))
.distinct()
.take(50);
Expand All @@ -657,14 +659,13 @@ public void distinctDefaultCancelDoesntRetainObjects() throws InterruptedExcepti
System.gc();
Thread.sleep(500);

assertThat(DistinctDefaultCancel.finalized.longValue())
assertThat(retainedDetector.finalizedCount())
.as("none retained after cancel")
.isEqualTo(50);
}

static class DistinctDefault {

static final AtomicLong finalized = new AtomicLong();
private final int i;

public DistinctDefault(int i) {
Expand All @@ -680,16 +681,10 @@ public int hashCode() {
public boolean equals(Object obj) {
return obj instanceof DistinctDefault && ((DistinctDefault) obj).i == i;
}

@Override
protected void finalize() throws Throwable {
finalized.incrementAndGet();
}
}

static class DistinctDefaultError {

static final AtomicLong finalized = new AtomicLong();
private final int i;

public DistinctDefaultError(int i) {
Expand All @@ -705,16 +700,10 @@ public int hashCode() {
public boolean equals(Object obj) {
return obj instanceof DistinctDefaultError && ((DistinctDefaultError) obj).i == i;
}

@Override
protected void finalize() throws Throwable {
finalized.incrementAndGet();
}
}

static class DistinctDefaultCancel {

static final AtomicLong finalized = new AtomicLong();
private final int i;

public DistinctDefaultCancel(int i) {
Expand All @@ -730,10 +719,5 @@ public int hashCode() {
public boolean equals(Object obj) {
return obj instanceof DistinctDefaultCancel && ((DistinctDefaultCancel) obj).i == i;
}

@Override
protected void finalize() throws Throwable {
finalized.incrementAndGet();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import reactor.core.publisher.FluxDistinctTest.DistinctDefault;
import reactor.core.publisher.FluxDistinctTest.DistinctDefaultCancel;
import reactor.core.publisher.FluxDistinctTest.DistinctDefaultError;
import reactor.test.MemoryUtils;
import reactor.test.MemoryUtils.RetainedDetector;
import reactor.test.MockUtils;
import reactor.test.StepVerifier;
import reactor.test.publisher.FluxOperatorTest;
Expand Down Expand Up @@ -293,9 +295,9 @@ public int hashCode() {

@Test
public void distinctUntilChangedDefaultDoesntRetainObjects() throws InterruptedException {
DistinctDefault.finalized.set(0);
RetainedDetector retainedDetector = new RetainedDetector();
Flux<DistinctDefault> test = Flux.range(1, 100)
.map(DistinctDefault::new)
.map(i -> retainedDetector.tracked(new DistinctDefault(i)))
.distinctUntilChanged();

StepVerifier.create(test)
Expand All @@ -305,16 +307,16 @@ public void distinctUntilChangedDefaultDoesntRetainObjects() throws InterruptedE
System.gc();
Thread.sleep(500);

assertThat(DistinctDefault.finalized.longValue())
assertThat(retainedDetector.finalizedCount())
.as("none retained")
.isEqualTo(100);
}

@Test
public void distinctUntilChangedDefaultErrorDoesntRetainObjects() throws InterruptedException {
DistinctDefaultError.finalized.set(0);
RetainedDetector retainedDetector = new RetainedDetector();
Flux<DistinctDefaultError> test = Flux.range(1, 100)
.map(DistinctDefaultError::new)
.map(i -> retainedDetector.tracked(new DistinctDefaultError(i)))
.concatWith(Mono.error(new IllegalStateException("boom")))
.distinctUntilChanged();

Expand All @@ -325,16 +327,16 @@ public void distinctUntilChangedDefaultErrorDoesntRetainObjects() throws Interru
System.gc();
Thread.sleep(500);

assertThat(DistinctDefaultError.finalized.longValue())
assertThat(retainedDetector.finalizedCount())
.as("none retained after error")
.isEqualTo(100);
}

@Test
public void distinctUntilChangedDefaultCancelDoesntRetainObjects() throws InterruptedException {
DistinctDefaultCancel.finalized.set(0);
RetainedDetector retainedDetector = new RetainedDetector();
Flux<DistinctDefaultCancel> test = Flux.range(1, 100)
.map(DistinctDefaultCancel::new)
.map(i -> retainedDetector.tracked(new DistinctDefaultCancel(i)))
.concatWith(Mono.error(new IllegalStateException("boom")))
.distinctUntilChanged()
.take(50);
Expand All @@ -346,7 +348,7 @@ public void distinctUntilChangedDefaultCancelDoesntRetainObjects() throws Interr
System.gc();
Thread.sleep(500);

assertThat(FluxDistinctTest.DistinctDefaultCancel.finalized.longValue())
assertThat(retainedDetector.finalizedCount())
.as("none retained after cancel")
.isEqualTo(50);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,12 @@

package reactor.core.publisher;

import java.lang.ref.PhantomReference;
import java.lang.ref.ReferenceQueue;
import java.lang.ref.WeakReference;
import java.time.Duration;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
Expand All @@ -32,6 +36,7 @@
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.Scannable;
import reactor.test.MemoryUtils;
import reactor.test.StepVerifier;
import reactor.test.publisher.TestPublisher;
import reactor.test.subscriber.AssertSubscriber;
Expand Down Expand Up @@ -61,10 +66,10 @@ static <T> void expect(AssertSubscriber<Flux<T>> ts, int index, T... values) {
.assertNoError();
}


@Test
public void noWindowRetained_gh975() throws InterruptedException {
LongAdder created = new LongAdder();
LongAdder finalized = new LongAdder();
class Wrapper {

final int i;
Expand All @@ -78,19 +83,15 @@ class Wrapper {
public String toString() {
return "{i=" + i + '}';
}

@Override
protected void finalize() {
finalized.increment();
}
}
MemoryUtils.RetainedDetector finalizedTracker = new MemoryUtils.RetainedDetector();

final CountDownLatch latch = new CountDownLatch(1);
final UnicastProcessor<Wrapper> processor = UnicastProcessor.create();

Flux<Integer> emitter = Flux.range(1, 400)
.delayElements(Duration.ofMillis(10))
.doOnNext(i -> processor.onNext(new Wrapper(i)))
.doOnNext(i -> processor.onNext(finalizedTracker.tracked(new Wrapper(i))))
.doOnComplete(processor::onComplete);

AtomicReference<FluxWindowWhen.WindowWhenMainSubscriber> startEndMain = new AtomicReference<>();
Expand All @@ -108,7 +109,7 @@ protected void finalize() {
.index()
.doOnNext(it -> System.gc())
//index, number of windows "in flight", finalized
.map(t2 -> Tuples.of(t2.getT1(), windows.get().size(), finalized.longValue()))
.map(t2 -> Tuples.of(t2.getT1(), windows.get().size(), finalizedTracker.finalizedCount()))
.doOnNext(v -> LOGGER.info(v.toString()))
.doOnComplete(latch::countDown)
.collectList();
Expand All @@ -131,7 +132,10 @@ protected void finalize() {
assertThat(windows.get().size())
.as("no window retained")
.isZero();
assertThat(finalized.longValue())

System.out.println(finalizedTracker.trackedTotal());

assertThat(finalizedTracker.finalizedCount())
.as("final GC collects all")
.isEqualTo(created.longValue());
}
Expand Down
Loading

0 comments on commit 74177c5

Please sign in to comment.