Skip to content

Commit 7b545a4

Browse files
authored
Prevent parallel invocation instrumentation (micronaut-projects#3432)
1 parent fc3832c commit 7b545a4

5 files changed

+74
-105
lines changed

runtime/src/main/java/io/micronaut/reactive/rxjava2/RxInstrumentedCompletableObserver.java

+13-18
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,8 @@
3030
@Internal
3131
final class RxInstrumentedCompletableObserver implements CompletableObserver, RxInstrumentedComponent {
3232
private final CompletableObserver source;
33-
private final InvocationInstrumenter instrumenter;
34-
private boolean active;
33+
private final InvocationInstrumenter onSubscribeInstrumenter;
34+
private final InvocationInstrumenter onResultInstrumenter;
3535

3636
/**
3737
* Default constructor.
@@ -41,53 +41,48 @@ final class RxInstrumentedCompletableObserver implements CompletableObserver, Rx
4141
*/
4242
RxInstrumentedCompletableObserver(CompletableObserver source, RxInstrumenterFactory instrumenterFactory) {
4343
this.source = source;
44-
this.instrumenter = instrumenterFactory.create();
44+
this.onSubscribeInstrumenter = instrumenterFactory.create();
45+
this.onResultInstrumenter = instrumenterFactory.create();
4546
}
4647

4748
@Override
4849
public void onSubscribe(Disposable d) {
49-
if (instrumenter == null || active) {
50+
if (onSubscribeInstrumenter == null) {
5051
source.onSubscribe(d);
5152
} else {
5253
try {
53-
active = true;
54-
instrumenter.beforeInvocation();
54+
onSubscribeInstrumenter.beforeInvocation();
5555
source.onSubscribe(d);
5656
} finally {
57-
instrumenter.afterInvocation(false);
58-
active = false;
57+
onSubscribeInstrumenter.afterInvocation();
5958
}
6059
}
6160
}
6261

6362
@Override
6463
public void onError(Throwable t) {
65-
if (instrumenter == null || active) {
64+
if (onResultInstrumenter == null) {
6665
source.onError(t);
6766
} else {
6867
try {
69-
active = true;
70-
instrumenter.beforeInvocation();
68+
onResultInstrumenter.beforeInvocation();
7169
source.onError(t);
7270
} finally {
73-
instrumenter.afterInvocation(false);
74-
active = false;
71+
onResultInstrumenter.afterInvocation();
7572
}
7673
}
7774
}
7875

7976
@Override
8077
public void onComplete() {
81-
if (instrumenter == null || active) {
78+
if (onResultInstrumenter == null) {
8279
source.onComplete();
8380
} else {
8481
try {
85-
active = true;
86-
instrumenter.beforeInvocation();
82+
onResultInstrumenter.beforeInvocation();
8783
source.onComplete();
8884
} finally {
89-
instrumenter.afterInvocation(false);
90-
active = false;
85+
onResultInstrumenter.afterInvocation();
9186
}
9287
}
9388
}

runtime/src/main/java/io/micronaut/reactive/rxjava2/RxInstrumentedMaybeObserver.java

+16-23
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,8 @@
3131
@Internal
3232
final class RxInstrumentedMaybeObserver<T> implements MaybeObserver<T>, RxInstrumentedComponent {
3333
private final MaybeObserver<T> source;
34-
private final InvocationInstrumenter instrumenter;
35-
private boolean active;
34+
private final InvocationInstrumenter onSubscribeInstrumenter;
35+
private final InvocationInstrumenter onResultInstrumenter;
3636

3737
/**
3838
* Default constructor.
@@ -42,69 +42,62 @@ final class RxInstrumentedMaybeObserver<T> implements MaybeObserver<T>, RxInstru
4242
*/
4343
RxInstrumentedMaybeObserver(MaybeObserver<T> source, RxInstrumenterFactory instrumenterFactory) {
4444
this.source = source;
45-
this.instrumenter = instrumenterFactory.create();
45+
this.onSubscribeInstrumenter = instrumenterFactory.create();
46+
this.onResultInstrumenter = instrumenterFactory.create();
4647
}
4748

4849
@Override
4950
public void onSubscribe(Disposable d) {
50-
if (instrumenter == null || active) {
51+
if (onSubscribeInstrumenter == null) {
5152
source.onSubscribe(d);
5253
} else {
5354
try {
54-
active = true;
55-
instrumenter.beforeInvocation();
55+
onSubscribeInstrumenter.beforeInvocation();
5656
source.onSubscribe(d);
5757
} finally {
58-
instrumenter.afterInvocation(false);
59-
active = false;
58+
onSubscribeInstrumenter.afterInvocation();
6059
}
6160
}
6261
}
6362

6463
@Override
6564
public void onError(Throwable t) {
66-
if (instrumenter == null || active) {
65+
if (onResultInstrumenter == null) {
6766
source.onError(t);
6867
} else {
6968
try {
70-
active = true;
71-
instrumenter.beforeInvocation();
69+
onResultInstrumenter.beforeInvocation();
7270
source.onError(t);
7371
} finally {
74-
instrumenter.afterInvocation(false);
75-
active = false;
72+
onResultInstrumenter.afterInvocation();
7673
}
7774
}
7875
}
7976

8077
@Override
8178
public void onSuccess(T value) {
82-
if (instrumenter == null || active) {
79+
if (onResultInstrumenter == null) {
8380
source.onSuccess(value);
8481
} else {
8582
try {
86-
active = true;
87-
instrumenter.beforeInvocation();
83+
onResultInstrumenter.beforeInvocation();
8884
source.onSuccess(value);
8985
} finally {
90-
instrumenter.afterInvocation(false);
91-
active = false;
86+
onResultInstrumenter.afterInvocation();
9287
}
9388
}
9489
}
9590

9691
@Override
9792
public void onComplete() {
98-
if (instrumenter == null || active) {
93+
if (onResultInstrumenter == null) {
9994
source.onComplete();
10095
} else {
10196
try {
102-
active = true;
103-
instrumenter.beforeInvocation();
97+
onResultInstrumenter.beforeInvocation();
10498
source.onComplete();
10599
} finally {
106-
instrumenter.afterInvocation(false);
107-
active = false;
100+
onResultInstrumenter.afterInvocation();
108101
}
109102
}
110103
}

runtime/src/main/java/io/micronaut/reactive/rxjava2/RxInstrumentedObserver.java

+16-23
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,8 @@
3131
@Internal
3232
final class RxInstrumentedObserver<T> implements Observer<T>, RxInstrumentedComponent {
3333
private final Observer<T> source;
34-
private final InvocationInstrumenter instrumenter;
35-
private boolean active;
34+
private final InvocationInstrumenter onSubscribeInstrumenter;
35+
private final InvocationInstrumenter onResultInstrumenter;
3636

3737
/**
3838
* Default constructor.
@@ -42,71 +42,64 @@ final class RxInstrumentedObserver<T> implements Observer<T>, RxInstrumentedComp
4242
*/
4343
RxInstrumentedObserver(Observer<T> source, RxInstrumenterFactory instrumenterFactory) {
4444
this.source = source;
45-
this.instrumenter = instrumenterFactory.create();
45+
this.onSubscribeInstrumenter = instrumenterFactory.create();
46+
this.onResultInstrumenter = instrumenterFactory.create();
4647
}
4748

4849
@Override
4950
public void onSubscribe(Disposable d) {
50-
if (instrumenter == null || active) {
51+
if (onSubscribeInstrumenter == null) {
5152
source.onSubscribe(d);
5253
} else {
5354
try {
54-
active = true;
55-
instrumenter.beforeInvocation();
55+
onSubscribeInstrumenter.beforeInvocation();
5656
source.onSubscribe(d);
5757
} finally {
58-
instrumenter.afterInvocation(false);
59-
active = false;
58+
onSubscribeInstrumenter.afterInvocation();
6059
}
6160
}
6261
}
6362

6463
@Override
6564
public void onNext(T t) {
66-
if (instrumenter == null || active) {
65+
if (onResultInstrumenter == null) {
6766
source.onNext(t);
6867
} else {
6968
try {
70-
active = true;
71-
instrumenter.beforeInvocation();
69+
onResultInstrumenter.beforeInvocation();
7270
source.onNext(t);
7371
} finally {
74-
instrumenter.afterInvocation(false);
75-
active = false;
72+
onResultInstrumenter.afterInvocation();
7673
}
7774
}
7875
}
7976

8077
@SuppressWarnings("Duplicates")
8178
@Override
8279
public void onError(Throwable t) {
83-
if (instrumenter == null || active) {
80+
if (onResultInstrumenter == null) {
8481
source.onError(t);
8582
} else {
8683
try {
87-
active = true;
88-
instrumenter.beforeInvocation();
84+
onResultInstrumenter.beforeInvocation();
8985
source.onError(t);
9086
} finally {
91-
instrumenter.afterInvocation(false);
92-
active = false;
87+
onResultInstrumenter.afterInvocation();
9388
}
9489
}
9590
}
9691

9792
@SuppressWarnings("Duplicates")
9893
@Override
9994
public void onComplete() {
100-
if (instrumenter == null || active) {
95+
if (onResultInstrumenter == null) {
10196
source.onComplete();
10297
} else {
10398
try {
104-
active = true;
105-
instrumenter.beforeInvocation();
99+
onResultInstrumenter.beforeInvocation();
106100
source.onComplete();
107101
} finally {
108-
instrumenter.afterInvocation(false);
109-
active = false;
102+
onResultInstrumenter.afterInvocation();
110103
}
111104
}
112105
}

runtime/src/main/java/io/micronaut/reactive/rxjava2/RxInstrumentedSingleObserver.java

+13-18
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,8 @@
3131
@Internal
3232
final class RxInstrumentedSingleObserver<T> implements SingleObserver<T>, RxInstrumentedComponent {
3333
private final SingleObserver<T> source;
34-
private final InvocationInstrumenter instrumenter;
35-
private boolean active;
34+
private final InvocationInstrumenter onSubscribeInstrumenter;
35+
private final InvocationInstrumenter onResultInstrumenter;
3636

3737
/**
3838
* Default constructor.
@@ -42,53 +42,48 @@ final class RxInstrumentedSingleObserver<T> implements SingleObserver<T>, RxInst
4242
*/
4343
RxInstrumentedSingleObserver(SingleObserver<T> source, RxInstrumenterFactory instrumenterFactory) {
4444
this.source = source;
45-
this.instrumenter = instrumenterFactory.create();
45+
this.onSubscribeInstrumenter = instrumenterFactory.create();
46+
this.onResultInstrumenter = instrumenterFactory.create();
4647
}
4748

4849
@Override
4950
public void onSubscribe(Disposable d) {
50-
if (instrumenter == null || active) {
51+
if (onSubscribeInstrumenter == null) {
5152
source.onSubscribe(d);
5253
} else {
5354
try {
54-
active = true;
55-
instrumenter.beforeInvocation();
55+
onSubscribeInstrumenter.beforeInvocation();
5656
source.onSubscribe(d);
5757
} finally {
58-
instrumenter.afterInvocation(false);
59-
active = false;
58+
onSubscribeInstrumenter.afterInvocation();
6059
}
6160
}
6261
}
6362

6463
@Override
6564
public void onError(Throwable t) {
66-
if (instrumenter == null || active) {
65+
if (onResultInstrumenter == null) {
6766
source.onError(t);
6867
} else {
6968
try {
70-
active = true;
71-
instrumenter.beforeInvocation();
69+
onResultInstrumenter.beforeInvocation();
7270
source.onError(t);
7371
} finally {
74-
instrumenter.afterInvocation(false);
75-
active = false;
72+
onResultInstrumenter.afterInvocation();
7673
}
7774
}
7875
}
7976

8077
@Override
8178
public void onSuccess(T value) {
82-
if (instrumenter == null || active) {
79+
if (onResultInstrumenter == null) {
8380
source.onSuccess(value);
8481
} else {
8582
try {
86-
active = true;
87-
instrumenter.beforeInvocation();
83+
onResultInstrumenter.beforeInvocation();
8884
source.onSuccess(value);
8985
} finally {
90-
instrumenter.afterInvocation(false);
91-
active = false;
86+
onResultInstrumenter.afterInvocation();
9287
}
9388
}
9489
}

0 commit comments

Comments
 (0)