Skip to content

Commit 2f0941b

Browse files
Merge pull request #2384 from newrelic/azure-service-bus-instr
Azure ServiceBus 7.15.0 instrumentation
2 parents 0619771 + 398a06e commit 2f0941b

File tree

10 files changed

+551
-0
lines changed

10 files changed

+551
-0
lines changed
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
dependencies {
2+
implementation(project(":agent-bridge"))
3+
implementation("com.azure:azure-messaging-servicebus:7.17.11")
4+
implementation("io.projectreactor:reactor-core:3.5.11")
5+
}
6+
7+
8+
9+
jar {
10+
manifest { attributes 'Implementation-Title': 'com.newrelic.instrumentation.azure-messaging-servicebus-7.15.0',
11+
'Implementation-Title-Alias': 'azure-messaging-servicebus-7.15.0' }
12+
}
13+
14+
verifyInstrumentation {
15+
passes 'com.azure:azure-messaging-servicebus:[7.15.0,)'
16+
}
17+
18+
site {
19+
title 'Azure Service Bus'
20+
type 'Messaging'
21+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/*
2+
*
3+
* * Copyright 2025 New Relic Corporation. All rights reserved.
4+
* * SPDX-License-Identifier: Apache-2.0
5+
*
6+
*/
7+
8+
package com.azure.messaging.servicebus;
9+
10+
import com.azure.messaging.servicebus.implementation.MessagingEntityType;
11+
import com.newrelic.api.agent.NewRelic;
12+
import com.newrelic.api.agent.Segment;
13+
import com.newrelic.api.agent.Token;
14+
import com.newrelic.api.agent.Trace;
15+
import com.newrelic.api.agent.Transaction;
16+
import com.newrelic.api.agent.weaver.MatchType;
17+
import com.newrelic.api.agent.weaver.NewField;
18+
import com.newrelic.api.agent.weaver.Weave;
19+
import com.newrelic.api.agent.weaver.Weaver;
20+
import com.newrelic.utils.ServiceBusUtil;
21+
import reactor.core.publisher.Flux;
22+
23+
@Weave(type = MatchType.ExactClass, originalName = "com.azure.messaging.servicebus.ServiceBusAsyncConsumer")
24+
class ServiceBusAsyncConsumer_Instrumentation {
25+
26+
@NewField
27+
public String nrEntityPath;
28+
@NewField
29+
public MessagingEntityType nrEntityType;
30+
@NewField
31+
public String nrNamespace;
32+
33+
@Trace
34+
Flux<ServiceBusReceivedMessage> receive() {
35+
Transaction tx = NewRelic.getAgent().getTransaction();
36+
Token token = tx.getToken();
37+
Segment segment = tx.startSegment(ServiceBusUtil.LIBRARY, "receive");
38+
segment.reportAsExternal(ServiceBusUtil.generateExternalConsumeMetrics(nrEntityType, nrNamespace, nrEntityPath));
39+
40+
Flux<ServiceBusReceivedMessage> result = Weaver.callOriginal();
41+
42+
return ServiceBusUtil.registerFluxLifecycleHooks(result, segment, token);
43+
}
44+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
*
3+
* * Copyright 2025 New Relic Corporation. All rights reserved.
4+
* * SPDX-License-Identifier: Apache-2.0
5+
*
6+
*/
7+
8+
package com.azure.messaging.servicebus;
9+
10+
import com.azure.messaging.servicebus.implementation.MessagingEntityType;
11+
import com.newrelic.api.agent.weaver.MatchType;
12+
import com.newrelic.api.agent.weaver.NewField;
13+
import com.newrelic.api.agent.weaver.Weave;
14+
import com.newrelic.api.agent.weaver.WeaveAllConstructors;
15+
import com.newrelic.api.agent.weaver.Weaver;
16+
17+
@Weave(type = MatchType.ExactClass, originalName = "com.azure.messaging.servicebus.ServiceBusReceiverAsyncClient")
18+
public class ServiceBusReceiverAsyncClient_Instrumentation {
19+
20+
private final MessagingEntityType entityType = Weaver.callOriginal();
21+
@NewField
22+
public MessagingEntityType nrEntityType = entityType;
23+
24+
@WeaveAllConstructors
25+
ServiceBusReceiverAsyncClient_Instrumentation() {
26+
// do nothing, this is just here to get passed the compiler and validator checks
27+
}
28+
29+
private ServiceBusAsyncConsumer_Instrumentation getOrCreateConsumer() {
30+
ServiceBusAsyncConsumer_Instrumentation result = Weaver.callOriginal();
31+
result.nrEntityPath = getEntityPath();
32+
result.nrEntityType = entityType;
33+
result.nrNamespace = getFullyQualifiedNamespace();
34+
35+
return result;
36+
}
37+
38+
public String getFullyQualifiedNamespace() {
39+
return Weaver.callOriginal();
40+
}
41+
42+
public String getEntityPath() {
43+
return Weaver.callOriginal();
44+
}
45+
46+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
/*
2+
*
3+
* * Copyright 2025 New Relic Corporation. All rights reserved.
4+
* * SPDX-License-Identifier: Apache-2.0
5+
*
6+
*/
7+
8+
package com.azure.messaging.servicebus;
9+
10+
import com.azure.core.util.IterableStream;
11+
import com.newrelic.api.agent.Headers;
12+
import com.newrelic.api.agent.NewRelic;
13+
import com.newrelic.api.agent.Trace;
14+
import com.newrelic.api.agent.TransportType;
15+
import com.newrelic.api.agent.weaver.MatchType;
16+
import com.newrelic.api.agent.weaver.NewField;
17+
import com.newrelic.api.agent.weaver.Weave;
18+
import com.newrelic.api.agent.weaver.WeaveAllConstructors;
19+
import com.newrelic.api.agent.weaver.Weaver;
20+
import com.newrelic.utils.HeadersWrapper;
21+
import com.newrelic.utils.ServiceBusUtil;
22+
23+
import java.time.Duration;
24+
25+
@Weave(type = MatchType.ExactClass, originalName = "com.azure.messaging.servicebus.ServiceBusReceiverClient")
26+
public class ServiceBusReceiverClient_Instrumentation {
27+
28+
private final ServiceBusReceiverAsyncClient asyncClient = Weaver.callOriginal();
29+
@NewField
30+
public ServiceBusReceiverAsyncClient_Instrumentation nrAsyncClient =
31+
ServiceBusReceiverAsyncClient_Instrumentation.class.cast(asyncClient);
32+
33+
@WeaveAllConstructors
34+
ServiceBusReceiverClient_Instrumentation() {
35+
// do nothing, this is just here to get passed the compiler and validator checks
36+
}
37+
38+
@Trace
39+
public IterableStream<ServiceBusReceivedMessage> receiveMessages(int maxMessages, Duration maxWaitTime) {
40+
NewRelic.getAgent().getTracedMethod().reportAsExternal(ServiceBusUtil.generateExternalConsumeMetrics(
41+
nrAsyncClient.nrEntityType, nrAsyncClient.getFullyQualifiedNamespace(), nrAsyncClient.getEntityPath()));
42+
43+
IterableStream<ServiceBusReceivedMessage> result = Weaver.callOriginal();
44+
45+
ServiceBusReceivedMessage firstMessage = ServiceBusUtil.getFirstMessage(result); // can't be done inside the weaved class
46+
if (firstMessage != null) {
47+
Headers headers = new HeadersWrapper(firstMessage.getApplicationProperties());
48+
NewRelic.getAgent().getTransaction().acceptDistributedTraceHeaders(TransportType.ServiceBus, headers);
49+
}
50+
51+
return result;
52+
}
53+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
/*
2+
*
3+
* * Copyright 2025 New Relic Corporation. All rights reserved.
4+
* * SPDX-License-Identifier: Apache-2.0
5+
*
6+
*/
7+
8+
package com.azure.messaging.servicebus;
9+
10+
import com.azure.messaging.servicebus.implementation.MessagingEntityType;
11+
import com.newrelic.api.agent.MessageProduceParameters;
12+
import com.newrelic.api.agent.NewRelic;
13+
import com.newrelic.api.agent.Trace;
14+
import com.newrelic.api.agent.weaver.MatchType;
15+
import com.newrelic.api.agent.weaver.NewField;
16+
import com.newrelic.api.agent.weaver.Weave;
17+
import com.newrelic.api.agent.weaver.WeaveAllConstructors;
18+
import com.newrelic.api.agent.weaver.Weaver;
19+
import com.newrelic.utils.ServiceBusBatchRequestHeaders;
20+
import com.newrelic.utils.ServiceBusUtil;
21+
import reactor.core.publisher.Mono;
22+
23+
import java.util.logging.Level;
24+
25+
// Note: the ServiceBusSenderClient (non-Async) proxies to the Async client, so no need to instrument both
26+
// that is not true for the receiver
27+
@Weave(type = MatchType.ExactClass, originalName = "com.azure.messaging.servicebus.ServiceBusSenderAsyncClient")
28+
public final class ServiceBusSenderAsyncClient_Instrumentation {
29+
30+
private final MessagingEntityType entityType = Weaver.callOriginal();
31+
private final String entityName = Weaver.callOriginal();
32+
private final String fullyQualifiedNamespace = Weaver.callOriginal();
33+
@NewField
34+
public String nrEntityName = entityName;
35+
@NewField
36+
public String nrFullyQualifiedNamespace = fullyQualifiedNamespace;
37+
@NewField
38+
public MessagingEntityType nrEntityType = entityType;
39+
40+
@WeaveAllConstructors
41+
ServiceBusSenderAsyncClient_Instrumentation() {
42+
// do nothing, this is just here to get passed the compiler and validator checks
43+
}
44+
45+
// this is called by the non-async version with the same signature
46+
@Trace
47+
public Mono<Void> sendMessage(ServiceBusMessage message) {
48+
ServiceBusBatchRequestHeaders headers = new ServiceBusBatchRequestHeaders(message);
49+
NewRelic.getAgent().getTransaction().insertDistributedTraceHeaders(headers);
50+
message = headers.tryToUpdateHeaders();
51+
52+
MessageProduceParameters messageProduceParameters = ServiceBusUtil.generateExternalProduceMetrics(this);
53+
NewRelic.getAgent().getTracedMethod().reportAsExternal(messageProduceParameters);
54+
55+
return Weaver.callOriginal();
56+
}
57+
58+
// this is called by the non-async version with the same signature
59+
@Trace
60+
public Mono<Void> sendMessage(ServiceBusMessage message, ServiceBusTransactionContext transactionContext) {
61+
ServiceBusBatchRequestHeaders headers = new ServiceBusBatchRequestHeaders(message);
62+
NewRelic.getAgent().getTransaction().insertDistributedTraceHeaders(headers);
63+
message = headers.tryToUpdateHeaders();
64+
65+
MessageProduceParameters messageProduceParameters = ServiceBusUtil.generateExternalProduceMetrics(this);
66+
NewRelic.getAgent().getTracedMethod().reportAsExternal(messageProduceParameters);
67+
return Weaver.callOriginal();
68+
}
69+
70+
// this is called by the non-async version with the same signature
71+
@Trace
72+
public Mono<Void> sendMessages(ServiceBusMessageBatch batch) {
73+
batch = tryToUpdateBatch(batch);
74+
75+
MessageProduceParameters messageProduceParameters = ServiceBusUtil.generateExternalProduceMetrics(this);
76+
NewRelic.getAgent().getTracedMethod().reportAsExternal(messageProduceParameters);
77+
return Weaver.callOriginal();
78+
}
79+
80+
// this is called by the non-async version with the same signature
81+
@Trace
82+
public Mono<Void> sendMessages(ServiceBusMessageBatch batch, ServiceBusTransactionContext transactionContext) {
83+
batch = tryToUpdateBatch(batch);
84+
85+
MessageProduceParameters messageProduceParameters = ServiceBusUtil.generateExternalProduceMetrics(this);
86+
NewRelic.getAgent().getTracedMethod().reportAsExternal(messageProduceParameters);
87+
return Weaver.callOriginal();
88+
}
89+
90+
// if we are unable to add DT headers for even 1 message here, we won't add it for any in the batch
91+
private ServiceBusMessageBatch tryToUpdateBatch(ServiceBusMessageBatch batch) {
92+
// first let's check to make sure the batch can hold all the new headers
93+
// Note: this won't be exact, but we dramatically overestimate the NR_DT_HEADER_SIZE to account
94+
if (batch.getMaxSizeInBytes() - batch.getSizeInBytes() < ServiceBusUtil.NR_DT_HEADER_SIZE * batch.getCount()) {
95+
NewRelic.getAgent().getLogger().log(Level.FINE, "Unable to add DT headers to batch, not enough space in batch");
96+
return batch;
97+
}
98+
99+
// now let's check to make sure that each message is small enough to add headers to
100+
// Note: this won't be exact, but we dramatically overestimate the NR_DT_HEADER_SIZE to account
101+
for (ServiceBusMessage message : batch.getMessages()) {
102+
if (message.getBody().getLength() + ServiceBusUtil.NR_DT_HEADER_SIZE > ServiceBusBatchRequestHeaders.DEFAULT_MAX_MESSAGE_SIZE) {
103+
NewRelic.getAgent().getLogger().log(Level.FINE, "Unable to add DT headers to batch, not enough space in message: "+message.getMessageId());
104+
return batch;
105+
}
106+
}
107+
108+
for (ServiceBusMessage message : batch.getMessages()) {
109+
ServiceBusBatchRequestHeaders headers = new ServiceBusBatchRequestHeaders(message);
110+
NewRelic.getAgent().getTransaction().insertDistributedTraceHeaders(headers);
111+
// we've already checked for space, we can skip tryToUpdateHeaders(), it only adds the same check
112+
headers.addDTHeaders();
113+
}
114+
115+
return batch;
116+
}
117+
}
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
/*
2+
*
3+
* * Copyright 2025 New Relic Corporation. All rights reserved.
4+
* * SPDX-License-Identifier: Apache-2.0
5+
*
6+
*/
7+
8+
package com.newrelic.utils;
9+
10+
import com.newrelic.api.agent.HeaderType;
11+
import com.newrelic.api.agent.Headers;
12+
13+
import java.util.ArrayList;
14+
import java.util.Collection;
15+
import java.util.HashSet;
16+
import java.util.Map;
17+
18+
public class HeadersWrapper implements Headers {
19+
20+
private final Map<String, Object> delegate;
21+
22+
public HeadersWrapper(Map<String, Object> headers) {
23+
this.delegate = headers;
24+
}
25+
26+
@Override
27+
public HeaderType getHeaderType() {
28+
return HeaderType.MESSAGE;
29+
}
30+
31+
@Override
32+
public String getHeader(String name) {
33+
return delegate.get(name) == null ? null : delegate.get(name).toString();
34+
}
35+
36+
@Override
37+
public Collection<String> getHeaders(String name) {
38+
Collection<String> headers = new ArrayList<>();
39+
headers.add(getHeader(name));
40+
return headers;
41+
}
42+
43+
@Override
44+
public void setHeader(String name, String value) {
45+
delegate.remove(name);
46+
delegate.put(name, value);
47+
}
48+
49+
@Override
50+
public void addHeader(String name, String value) {
51+
delegate.put(name, value);
52+
}
53+
54+
@Override
55+
public Collection<String> getHeaderNames() {
56+
Collection<String> headerNames = new HashSet<>();
57+
for(String key : delegate.keySet()) {
58+
headerNames.add(key);
59+
}
60+
return headerNames;
61+
}
62+
63+
@Override
64+
public boolean containsHeader(String name) {
65+
return delegate.containsKey(name);
66+
}
67+
}

0 commit comments

Comments
 (0)