Skip to content

Commit 4e201cc

Browse files
Enhance Visibility Into Reactor FlatMapMain (#2308)
* add instrumentation FlatMapMain calls * move token linking out of constructor and into onSubscribe
1 parent ebf92d3 commit 4e201cc

File tree

2 files changed

+152
-0
lines changed

2 files changed

+152
-0
lines changed
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
/*
2+
*
3+
* * Copyright 2025 New Relic Corporation. All rights reserved.
4+
* * SPDX-License-Identifier: Apache-2.0
5+
*
6+
*/
7+
8+
package reactor.core.publisher;
9+
10+
import com.newrelic.agent.bridge.AgentBridge;
11+
import com.newrelic.agent.bridge.Transaction;
12+
import com.newrelic.api.agent.Trace;
13+
import com.newrelic.api.agent.weaver.MatchType;
14+
import com.newrelic.api.agent.weaver.NewField;
15+
import com.newrelic.api.agent.weaver.Weave;
16+
import com.newrelic.api.agent.Token;
17+
import com.newrelic.api.agent.weaver.WeaveAllConstructors;
18+
import com.newrelic.api.agent.weaver.Weaver;
19+
import org.reactivestreams.Subscription;
20+
21+
@Weave(type = MatchType.ExactClass, originalName = "reactor.core.publisher.MonoFlatMap")
22+
abstract class MonoFlatMap_Instrumentation {
23+
24+
@Weave(type = MatchType.ExactClass, originalName = "reactor.core.publisher.MonoFlatMap$FlatMapMain")
25+
static final class FlatMapMain_Instrumentation<T, R> {
26+
27+
@NewField
28+
private Token token;
29+
30+
@WeaveAllConstructors
31+
FlatMapMain_Instrumentation() {}
32+
33+
public void onSubscribe(Subscription s) {
34+
Transaction tx = AgentBridge.getAgent().getTransaction(false);
35+
if (tx != null && tx.isTransactionNameSet() && token == null) {
36+
token = tx.getToken();
37+
if (token != null && token.isActive()) {
38+
token.link();
39+
}
40+
}
41+
Weaver.callOriginal();
42+
}
43+
44+
@Trace(async = true)
45+
public void onNext(T t) {
46+
if (token != null && token.isActive()) {
47+
token.link();
48+
}
49+
Weaver.callOriginal();
50+
}
51+
52+
public void onComplete() {
53+
if (token != null && token.isActive()) {
54+
token.linkAndExpire();
55+
token = null;
56+
}
57+
Weaver.callOriginal();
58+
}
59+
60+
public void onError(Throwable t) {
61+
if (token != null && token.isActive()) {
62+
token.linkAndExpire();
63+
token = null;
64+
}
65+
Weaver.callOriginal();
66+
}
67+
68+
public void cancel() {
69+
if (token != null && token.isActive()) {
70+
token.linkAndExpire();
71+
token = null;
72+
}
73+
Weaver.callOriginal();
74+
}
75+
}
76+
}
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
/*
2+
*
3+
* * Copyright 2025 New Relic Corporation. All rights reserved.
4+
* * SPDX-License-Identifier: Apache-2.0
5+
*
6+
*/
7+
8+
package reactor.core.publisher;
9+
10+
import com.newrelic.agent.bridge.AgentBridge;
11+
import com.newrelic.agent.bridge.Transaction;
12+
import com.newrelic.api.agent.Trace;
13+
import com.newrelic.api.agent.weaver.MatchType;
14+
import com.newrelic.api.agent.weaver.NewField;
15+
import com.newrelic.api.agent.weaver.Weave;
16+
import com.newrelic.api.agent.Token;
17+
import com.newrelic.api.agent.weaver.WeaveAllConstructors;
18+
import com.newrelic.api.agent.weaver.Weaver;
19+
import org.reactivestreams.Subscription;
20+
21+
@Weave(type = MatchType.ExactClass, originalName = "reactor.core.publisher.MonoFlatMap")
22+
abstract class MonoFlatMap_Instrumentation {
23+
24+
@Weave(type = MatchType.ExactClass, originalName = "reactor.core.publisher.MonoFlatMap$FlatMapMain")
25+
static final class FlatMapMain_Instrumentation<T, R> {
26+
27+
@NewField
28+
private Token token;
29+
30+
@WeaveAllConstructors
31+
FlatMapMain_Instrumentation() {}
32+
33+
public void onSubscribe(Subscription s) {
34+
Transaction tx = AgentBridge.getAgent().getTransaction(false);
35+
if (tx != null && tx.isTransactionNameSet() && token == null) {
36+
token = tx.getToken();
37+
if (token != null && token.isActive()) {
38+
token.link();
39+
}
40+
}
41+
Weaver.callOriginal();
42+
}
43+
44+
@Trace(async = true)
45+
public void onNext(T t) {
46+
if (token != null && token.isActive()) {
47+
token.link();
48+
}
49+
Weaver.callOriginal();
50+
}
51+
52+
public void onComplete() {
53+
if (token != null && token.isActive()) {
54+
token.linkAndExpire();
55+
token = null;
56+
}
57+
Weaver.callOriginal();
58+
}
59+
60+
public void onError(Throwable t) {
61+
if (token != null && token.isActive()) {
62+
token.linkAndExpire();
63+
token = null;
64+
}
65+
Weaver.callOriginal();
66+
}
67+
68+
public void cancel() {
69+
if (token != null && token.isActive()) {
70+
token.linkAndExpire();
71+
token = null;
72+
}
73+
Weaver.callOriginal();
74+
}
75+
}
76+
}

0 commit comments

Comments
 (0)