-
Notifications
You must be signed in to change notification settings - Fork 992
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add Observability instrumentation for Jakarta JMS
This commit adds a new `JmsInstrumentation` class that instruments an instances of a `jakarta.jms.Session` with the Observation API. This proxies the `MessageProducer` and `MessageConsumer` instances created by the session and creates dedicated observations: * `send*` method calls on `MessageProducer` will create `"jms.message.publish"` observations. * when configuring a `MessageListener` on `MessageConsumer` instances returned by the session, `"jms.message.process"` observations are created when messages are received and processed by the callback. Here is how an existing JMS Session instance can be instrumented for observability: ``` Session original = ... ObservationRegistry registry = ... Session session = JmsInstrumentation.instrumentSession(original, registry); Topic topic = session.createTopic("micrometer.test.topic"); MessageProducer producer = session.createProducer(topic); // this operation will create a "jms.message.publish" observation producer.send(session.createMessage("test message content")); MessageConsumer consumer = session.createConsumer(topic); // when a message is processed by the listener, // a "jms.message.process" observation is created consumer.setMessageListener(message -> consumeMessage(message)); ``` This change does not instrument `receive` methods on the `MessageConsumer` as there is little value here. The resulting metric would only measure the time it takes to receive the message (i.e. not process it) and there would be no actionable trace, as those methods return the received `Message` and its processing will not happen in tracing scope. Closes gh-4007
- Loading branch information
Showing
19 changed files
with
1,601 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
140 changes: 140 additions & 0 deletions
140
...java/io/micrometer/core/instrument/binder/jms/DefaultJmsProcessObservationConvention.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,140 @@ | ||
/* | ||
* Copyright 2023 VMware, Inc. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* https://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package io.micrometer.core.instrument.binder.jms; | ||
|
||
import io.micrometer.common.KeyValue; | ||
import io.micrometer.common.KeyValues; | ||
import io.micrometer.core.instrument.binder.jms.JmsObservationDocumentation.LowCardinalityKeyNames; | ||
import jakarta.jms.*; | ||
|
||
import static io.micrometer.core.instrument.binder.jms.JmsObservationDocumentation.*; | ||
|
||
/** | ||
* Default implementation for {@link JmsProcessObservationConvention}. | ||
* | ||
* @author Brian Clozel | ||
* @since 1.12.0 | ||
*/ | ||
public class DefaultJmsProcessObservationConvention implements JmsProcessObservationConvention { | ||
|
||
private static final KeyValue DESTINATION_TEMPORARY = KeyValue.of(LowCardinalityKeyNames.DESTINATION_TEMPORARY, | ||
"true"); | ||
|
||
private static final KeyValue DESTINATION_DURABLE = KeyValue.of(LowCardinalityKeyNames.DESTINATION_TEMPORARY, | ||
"false"); | ||
|
||
private static final KeyValue EXCEPTION_NONE = KeyValue.of(LowCardinalityKeyNames.EXCEPTION, KeyValue.NONE_VALUE); | ||
|
||
private static final KeyValue OPERATION_PROCESS = KeyValue.of(LowCardinalityKeyNames.OPERATION, "process"); | ||
|
||
private static final KeyValue DESTINATION_NAME_UNKNOWN = KeyValue.of(HighCardinalityKeyNames.DESTINATION_NAME, | ||
"unknown"); | ||
|
||
private static final KeyValue MESSAGE_CONVERSATION_ID_UNKNOWN = KeyValue.of(HighCardinalityKeyNames.CONVERSATION_ID, | ||
"unknown"); | ||
|
||
private static final KeyValue MESSAGE_ID_UNKNOWN = KeyValue.of(HighCardinalityKeyNames.MESSAGE_ID, "unknown"); | ||
|
||
@Override | ||
public String getName() { | ||
return "jms.message.process"; | ||
} | ||
|
||
@Override | ||
public String getContextualName(JmsProcessObservationContext context) { | ||
return destinationName(context).getValue() + " process"; | ||
} | ||
|
||
@Override | ||
public KeyValues getLowCardinalityKeyValues(JmsProcessObservationContext context) { | ||
return KeyValues.of(exception(context), OPERATION_PROCESS, temporaryDestination(context)); | ||
} | ||
|
||
private KeyValue exception(JmsProcessObservationContext context) { | ||
Throwable error = context.getError(); | ||
if (error != null) { | ||
String simpleName = error.getClass().getSimpleName(); | ||
return KeyValue.of(LowCardinalityKeyNames.EXCEPTION, | ||
!simpleName.isEmpty() ? simpleName : error.getClass().getName()); | ||
} | ||
return EXCEPTION_NONE; | ||
} | ||
|
||
protected KeyValue temporaryDestination(JmsProcessObservationContext context) { | ||
try { | ||
Message message = context.getCarrier(); | ||
Destination destination = message.getJMSDestination(); | ||
if (destination instanceof TemporaryQueue || destination instanceof TemporaryTopic) { | ||
return DESTINATION_TEMPORARY; | ||
} | ||
return DESTINATION_DURABLE; | ||
} | ||
catch (JMSException exc) { | ||
return DESTINATION_DURABLE; | ||
} | ||
} | ||
|
||
@Override | ||
public KeyValues getHighCardinalityKeyValues(JmsProcessObservationContext context) { | ||
return KeyValues.of(correlationId(context), destinationName(context), messageId(context)); | ||
} | ||
|
||
protected KeyValue correlationId(JmsProcessObservationContext context) { | ||
try { | ||
Message message = context.getCarrier(); | ||
if (message.getJMSCorrelationID() == null) { | ||
return MESSAGE_CONVERSATION_ID_UNKNOWN; | ||
} | ||
return KeyValue.of(HighCardinalityKeyNames.CONVERSATION_ID, context.getCarrier().getJMSCorrelationID()); | ||
} | ||
catch (JMSException exc) { | ||
return MESSAGE_CONVERSATION_ID_UNKNOWN; | ||
} | ||
} | ||
|
||
protected KeyValue destinationName(JmsProcessObservationContext context) { | ||
try { | ||
Destination jmsDestination = context.getCarrier().getJMSDestination(); | ||
if (jmsDestination instanceof Queue) { | ||
Queue queue = (Queue) jmsDestination; | ||
return KeyValue.of(HighCardinalityKeyNames.DESTINATION_NAME, queue.getQueueName()); | ||
} | ||
if (jmsDestination instanceof Topic) { | ||
Topic topic = (Topic) jmsDestination; | ||
return KeyValue.of(HighCardinalityKeyNames.DESTINATION_NAME, topic.getTopicName()); | ||
} | ||
return DESTINATION_NAME_UNKNOWN; | ||
} | ||
catch (JMSException e) { | ||
return DESTINATION_NAME_UNKNOWN; | ||
} | ||
} | ||
|
||
protected KeyValue messageId(JmsProcessObservationContext context) { | ||
try { | ||
Message message = context.getCarrier(); | ||
if (message.getJMSMessageID() == null) { | ||
return MESSAGE_ID_UNKNOWN; | ||
} | ||
return KeyValue.of(HighCardinalityKeyNames.MESSAGE_ID, context.getCarrier().getJMSMessageID()); | ||
} | ||
catch (JMSException exc) { | ||
return MESSAGE_ID_UNKNOWN; | ||
} | ||
} | ||
|
||
} |
146 changes: 146 additions & 0 deletions
146
...java/io/micrometer/core/instrument/binder/jms/DefaultJmsPublishObservationConvention.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,146 @@ | ||
/* | ||
* Copyright 2023 VMware, Inc. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* https://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package io.micrometer.core.instrument.binder.jms; | ||
|
||
import io.micrometer.common.KeyValue; | ||
import io.micrometer.common.KeyValues; | ||
import jakarta.jms.*; | ||
|
||
import static io.micrometer.core.instrument.binder.jms.JmsObservationDocumentation.*; | ||
|
||
/** | ||
* Default implementation for {@link JmsPublishObservationConvention}. | ||
* | ||
* @author Brian Clozel | ||
* @since 1.12.0 | ||
*/ | ||
public class DefaultJmsPublishObservationConvention implements JmsPublishObservationConvention { | ||
|
||
private static final KeyValue DESTINATION_TEMPORARY = KeyValue.of(LowCardinalityKeyNames.DESTINATION_TEMPORARY, | ||
"true"); | ||
|
||
private static final KeyValue DESTINATION_DURABLE = KeyValue.of(LowCardinalityKeyNames.DESTINATION_TEMPORARY, | ||
"false"); | ||
|
||
private static final KeyValue EXCEPTION_NONE = KeyValue.of(LowCardinalityKeyNames.EXCEPTION, KeyValue.NONE_VALUE); | ||
|
||
private static final KeyValue OPERATION_PUBLISH = KeyValue.of(LowCardinalityKeyNames.OPERATION, "publish"); | ||
|
||
private static final KeyValue MESSAGE_CONVERSATION_ID_UNKNOWN = KeyValue.of(HighCardinalityKeyNames.CONVERSATION_ID, | ||
"unknown"); | ||
|
||
private static final KeyValue DESTINATION_NAME_UNKNOWN = KeyValue.of(HighCardinalityKeyNames.DESTINATION_NAME, | ||
"unknown"); | ||
|
||
private static final KeyValue MESSAGE_ID_UNKNOWN = KeyValue.of(HighCardinalityKeyNames.MESSAGE_ID, "unknown"); | ||
|
||
@Override | ||
public String getName() { | ||
return "jms.message.publish"; | ||
} | ||
|
||
@Override | ||
public String getContextualName(JmsPublishObservationContext context) { | ||
return destinationName(context).getValue() + " publish"; | ||
} | ||
|
||
@Override | ||
public KeyValues getLowCardinalityKeyValues(JmsPublishObservationContext context) { | ||
return KeyValues.of(exception(context), OPERATION_PUBLISH, temporaryDestination(context)); | ||
} | ||
|
||
private KeyValue exception(JmsPublishObservationContext context) { | ||
Throwable error = context.getError(); | ||
if (error != null) { | ||
String simpleName = error.getClass().getSimpleName(); | ||
return KeyValue.of(LowCardinalityKeyNames.EXCEPTION, | ||
!simpleName.isEmpty() ? simpleName : error.getClass().getName()); | ||
} | ||
return EXCEPTION_NONE; | ||
} | ||
|
||
protected KeyValue temporaryDestination(JmsPublishObservationContext context) { | ||
Message message = context.getCarrier(); | ||
try { | ||
if (message != null) { | ||
Destination destination = message.getJMSDestination(); | ||
if (destination instanceof TemporaryQueue || destination instanceof TemporaryTopic) { | ||
return DESTINATION_TEMPORARY; | ||
} | ||
} | ||
return DESTINATION_DURABLE; | ||
} | ||
catch (JMSException exc) { | ||
return DESTINATION_DURABLE; | ||
} | ||
} | ||
|
||
@Override | ||
public KeyValues getHighCardinalityKeyValues(JmsPublishObservationContext context) { | ||
return KeyValues.of(correlationId(context), destinationName(context), messageId(context)); | ||
} | ||
|
||
protected KeyValue correlationId(JmsPublishObservationContext context) { | ||
try { | ||
Message message = context.getCarrier(); | ||
if (message == null || message.getJMSCorrelationID() == null) { | ||
return MESSAGE_CONVERSATION_ID_UNKNOWN; | ||
} | ||
return KeyValue.of(HighCardinalityKeyNames.CONVERSATION_ID, context.getCarrier().getJMSCorrelationID()); | ||
} | ||
catch (JMSException exc) { | ||
return MESSAGE_CONVERSATION_ID_UNKNOWN; | ||
} | ||
} | ||
|
||
protected KeyValue destinationName(JmsPublishObservationContext context) { | ||
if (context.getCarrier() == null) { | ||
return DESTINATION_NAME_UNKNOWN; | ||
} | ||
try { | ||
Destination jmsDestination = context.getCarrier().getJMSDestination(); | ||
if (jmsDestination instanceof Queue) { | ||
Queue queue = (Queue) jmsDestination; | ||
return KeyValue.of(HighCardinalityKeyNames.DESTINATION_NAME, queue.getQueueName()); | ||
} | ||
else if (jmsDestination instanceof Topic) { | ||
Topic topic = (Topic) jmsDestination; | ||
return KeyValue.of(HighCardinalityKeyNames.DESTINATION_NAME, topic.getTopicName()); | ||
} | ||
else { | ||
return DESTINATION_NAME_UNKNOWN; | ||
} | ||
} | ||
catch (JMSException e) { | ||
return DESTINATION_NAME_UNKNOWN; | ||
} | ||
} | ||
|
||
protected KeyValue messageId(JmsPublishObservationContext context) { | ||
try { | ||
Message message = context.getCarrier(); | ||
if (message == null || message.getJMSMessageID() == null) { | ||
return MESSAGE_ID_UNKNOWN; | ||
} | ||
return KeyValue.of(HighCardinalityKeyNames.MESSAGE_ID, message.getJMSMessageID()); | ||
} | ||
catch (JMSException exc) { | ||
return MESSAGE_ID_UNKNOWN; | ||
} | ||
} | ||
|
||
} |
Oops, something went wrong.