Skip to content

Commit e849124

Browse files
committed
GH-3177: Optimize Observation when ObservationRegistry.NOOP
Fixes: #3177 There is some performance overhead when we deal with an `Observation` even if `ObservationRegistry.NOOP` * Fix respective `Observation` API usage to skip its calls when `ObservationRegistry.NOOP` # Conflicts: # spring-rabbit-stream/src/main/java/org/springframework/rabbit/stream/listener/StreamListenerContainer.java # spring-rabbit-stream/src/main/java/org/springframework/rabbit/stream/producer/RabbitStreamTemplate.java # spring-rabbit/src/main/java/org/springframework/amqp/rabbit/core/RabbitTemplate.java
1 parent 38d1f5c commit e849124

File tree

4 files changed

+66
-35
lines changed

4 files changed

+66
-35
lines changed

spring-rabbit-stream/src/main/java/org/springframework/rabbit/stream/listener/StreamListenerContainer.java

Lines changed: 33 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -320,14 +320,18 @@ public void setupMessageListener(MessageListener messageListener) {
320320
if (micrometerHolder != null) {
321321
sample = micrometerHolder.start();
322322
}
323-
Observation observation =
324-
RabbitStreamListenerObservation.STREAM_LISTENER_OBSERVATION.observation(this.observationConvention,
325-
DefaultRabbitStreamListenerObservationConvention.INSTANCE,
326-
() -> new RabbitStreamMessageReceiverContext(message, getListenerId(), this.streamName),
327-
registry);
323+
Observation observation = null;
324+
if (!registry.isNoop()) {
325+
observation =
326+
RabbitStreamListenerObservation.STREAM_LISTENER_OBSERVATION.observation(this.observationConvention,
327+
DefaultRabbitStreamListenerObservationConvention.INSTANCE,
328+
() -> new RabbitStreamMessageReceiverContext(message, getListenerId(), this.streamName),
329+
registry);
330+
}
331+
328332
Object finalSample = sample;
329333
if (this.streamListener != null) {
330-
observation.observe(() -> {
334+
Runnable listenerCall = () -> {
331335
try {
332336
this.streamListener.onStreamMessage(message, context);
333337
if (finalSample != null) {
@@ -346,13 +350,19 @@ public void setupMessageListener(MessageListener messageListener) {
346350
}
347351
throw RabbitExceptionTranslator.convertRabbitAccessException(ex);
348352
}
349-
});
353+
};
354+
if (observation != null) {
355+
observation.observe(listenerCall);
356+
}
357+
else {
358+
listenerCall.run();
359+
}
350360
}
351361
else {
352362
Message message2 = this.streamConverter.toMessage(message, new StreamMessageProperties(context));
353363
if (this.messageListener instanceof ChannelAwareMessageListener channelAwareMessageListener) {
354364
try {
355-
observation.observe(() -> {
365+
Runnable listenerCall = () -> {
356366
try {
357367
channelAwareMessageListener.onMessage(message2, null);
358368
if (finalSample != null) {
@@ -372,14 +382,27 @@ public void setupMessageListener(MessageListener messageListener) {
372382
}
373383
throw RabbitExceptionTranslator.convertRabbitAccessException(ex);
374384
}
375-
});
385+
};
386+
if (observation != null) {
387+
observation.observe(listenerCall);
388+
}
389+
else {
390+
listenerCall.run();
391+
}
376392
}
377393
catch (Exception ex) { // NOSONAR
378394
this.logger.error(ex, "Listener threw an exception");
379395
}
380396
}
381397
else {
382-
observation.observe(() -> this.messageListener.onMessage(message2));
398+
MessageListener messageListenerToUse = this.messageListener;
399+
Assert.state(messageListenerToUse != null, "'messageListener' or 'streamListener' is required");
400+
if (observation != null) {
401+
observation.observe(() -> messageListenerToUse.onMessage(message2));
402+
}
403+
else {
404+
messageListenerToUse.onMessage(message2);
405+
}
383406
}
384407
}
385408
});

spring-rabbit-stream/src/main/java/org/springframework/rabbit/stream/producer/RabbitStreamTemplate.java

Lines changed: 23 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,8 @@ public class RabbitStreamTemplate implements RabbitStreamOperations, Application
8181

8282
private String beanName;
8383

84-
private ProducerCustomizer producerCustomizer = (name, builder) -> { };
84+
private ProducerCustomizer producerCustomizer = (name, builder) -> {
85+
};
8586

8687
private boolean observationEnabled;
8788

@@ -106,7 +107,6 @@ public RabbitStreamTemplate(Environment environment, String streamName) {
106107
this.streamName = streamName;
107108
}
108109

109-
110110
private Producer createOrGetProducer() {
111111
if (this.producer == null) {
112112
this.lock.lock();
@@ -166,7 +166,6 @@ public void setSuperStreamRouting(Function<com.rabbitmq.stream.Message, String>
166166
}
167167
}
168168

169-
170169
/**
171170
* Set a converter for {@link #convertAndSend(Object)} operations.
172171
* @param messageConverter the converter.
@@ -239,13 +238,11 @@ public MessageConverter messageConverter() {
239238
return this.messageConverter;
240239
}
241240

242-
243241
@Override
244242
public StreamMessageConverter streamMessageConverter() {
245243
return this.streamConverter;
246244
}
247245

248-
249246
@Override
250247
public CompletableFuture<Boolean> send(Message message) {
251248
CompletableFuture<Boolean> future = new CompletableFuture<>();
@@ -274,7 +271,6 @@ public CompletableFuture<Boolean> convertAndSend(Object message, @Nullable Messa
274271
return send(message2);
275272
}
276273

277-
278274
@Override
279275
public CompletableFuture<Boolean> send(com.rabbitmq.stream.Message message) {
280276
CompletableFuture<Boolean> future = new CompletableFuture<>();
@@ -283,17 +279,23 @@ public CompletableFuture<Boolean> send(com.rabbitmq.stream.Message message) {
283279
}
284280

285281
private void observeSend(com.rabbitmq.stream.Message message, CompletableFuture<Boolean> future) {
286-
Observation observation = RabbitStreamTemplateObservation.STREAM_TEMPLATE_OBSERVATION.observation(
287-
this.observationConvention, DefaultRabbitStreamTemplateObservationConvention.INSTANCE,
288-
() -> new RabbitStreamMessageSenderContext(message, this.beanName, this.streamName),
289-
obtainObservationRegistry());
290-
observation.start();
282+
ObservationRegistry registry = obtainObservationRegistry();
283+
Observation observation = null;
284+
if (registry != null && !registry.isNoop()) {
285+
observation = RabbitStreamTemplateObservation.STREAM_TEMPLATE_OBSERVATION.observation(
286+
this.observationConvention, DefaultRabbitStreamTemplateObservationConvention.INSTANCE,
287+
() -> new RabbitStreamMessageSenderContext(message, this.beanName, this.streamName),
288+
registry);
289+
observation.start();
290+
}
291291
try {
292292
createOrGetProducer().send(message, handleConfirm(future, observation));
293293
}
294294
catch (Exception ex) {
295-
observation.error(ex);
296-
observation.stop();
295+
if (observation != null) {
296+
observation.error(ex);
297+
observation.stop();
298+
}
297299
future.completeExceptionally(ex);
298300
}
299301
}
@@ -316,11 +318,13 @@ public MessageBuilder messageBuilder() {
316318
return createOrGetProducer().messageBuilder();
317319
}
318320

319-
private ConfirmationHandler handleConfirm(CompletableFuture<Boolean> future, Observation observation) {
321+
private ConfirmationHandler handleConfirm(CompletableFuture<Boolean> future, @Nullable Observation observation) {
320322
return confStatus -> {
321323
if (confStatus.isConfirmed()) {
322324
future.complete(true);
323-
observation.stop();
325+
if (observation != null) {
326+
observation.stop();
327+
}
324328
}
325329
else {
326330
int code = confStatus.getCode();
@@ -332,8 +336,10 @@ private ConfirmationHandler handleConfirm(CompletableFuture<Boolean> future, Obs
332336
default -> "Unknown code: " + code;
333337
};
334338
StreamSendException ex = new StreamSendException(errorMessage, code);
335-
observation.error(ex);
336-
observation.stop();
339+
if (observation != null) {
340+
observation.error(ex);
341+
observation.stop();
342+
}
337343
future.completeExceptionally(ex);
338344
}
339345
};

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/core/RabbitTemplate.java

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,6 @@
5050
import com.rabbitmq.client.Return;
5151
import com.rabbitmq.client.ShutdownListener;
5252
import com.rabbitmq.client.ShutdownSignalException;
53-
import io.micrometer.observation.Observation;
5453
import io.micrometer.observation.ObservationRegistry;
5554

5655
import org.springframework.amqp.AmqpConnectException;
@@ -2491,17 +2490,20 @@ public void doSend(Channel channel, String exchangeArg, String routingKeyArg, Me
24912490
}
24922491

24932492
protected void observeTheSend(Channel channel, Message message, boolean mandatory, String exch, String rKey) {
2494-
24952493
if (!this.observationRegistryObtained && this.observationEnabled) {
24962494
obtainObservationRegistry(this.applicationContext);
24972495
this.observationRegistryObtained = true;
24982496
}
24992497
ObservationRegistry registry = getObservationRegistry();
2500-
Observation observation = RabbitTemplateObservation.TEMPLATE_OBSERVATION.observation(this.observationConvention,
2501-
DefaultRabbitTemplateObservationConvention.INSTANCE,
2502-
() -> new RabbitMessageSenderContext(message, this.beanName, exch, rKey), registry);
2503-
2504-
observation.observe(() -> sendToRabbit(channel, exch, rKey, mandatory, message));
2498+
if (registry.isNoop()) {
2499+
sendToRabbit(channel, exch, rKey, mandatory, message);
2500+
}
2501+
else {
2502+
RabbitTemplateObservation.TEMPLATE_OBSERVATION.observation(this.observationConvention,
2503+
DefaultRabbitTemplateObservationConvention.INSTANCE,
2504+
() -> new RabbitMessageSenderContext(message, this.beanName, exch, rKey), registry)
2505+
.observe(() -> sendToRabbit(channel, exch, rKey, mandatory, message));
2506+
}
25052507
}
25062508

25072509
/**

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/AbstractMessageListenerContainer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1511,7 +1511,7 @@ protected void invokeErrorHandler(Throwable ex) {
15111511
protected void executeListener(Channel channel, Object data) {
15121512
Observation observation;
15131513
ObservationRegistry registry = getObservationRegistry();
1514-
if (data instanceof Message message) {
1514+
if (data instanceof Message message && !registry.isNoop()) {
15151515
observation = RabbitListenerObservation.LISTENER_OBSERVATION.observation(this.observationConvention,
15161516
DefaultRabbitListenerObservationConvention.INSTANCE,
15171517
() -> new RabbitMessageReceiverContext(message, getListenerId()), registry);

0 commit comments

Comments
 (0)