Skip to content

Commit

Permalink
fix(instrumentation-aws-sdk): sqs message id missing on send command (#…
Browse files Browse the repository at this point in the history
…968)

* test(instrumentation-aws-sdk): make sure that message id attribute is set on send command

* fix(instrumentation-aws-sdk): set message id on send command

* style(instrumentation-aws-sdk): lint fixes

* chore(instrumentation-aws-sdk): use optional chaining in case data is null

* test(instrumentation-aws-sdk): cover sdk v2 sqsResponseHook messaging attributes

* style: lint fix

* test(instrumentation-aws-sdk): make sure correct http status code is used

* test(instrumentation-aws-sdk): rename test to inlcude sendMessage

Co-authored-by: Amir Blum <amir@aspecto.io>
  • Loading branch information
mentos1386 and Amir Blum authored May 3, 2022
1 parent eba9531 commit 8b36fe1
Show file tree
Hide file tree
Showing 3 changed files with 106 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -121,45 +121,61 @@ export class SqsServiceExtension implements ServiceExtension {
tracer: Tracer,
config: AwsSdkInstrumentationConfig
) => {
const messages: SQS.Message[] = response?.data?.Messages;
if (messages) {
const queueUrl = this.extractQueueUrl(response.request.commandInput);
const queueName = this.extractQueueNameFromUrl(queueUrl);

pubsubPropagation.patchMessagesArrayToStartProcessSpans<SQS.Message>({
messages,
parentContext: trace.setSpan(context.active(), span),
tracer,
messageToSpanDetails: (message: SQS.Message) => ({
name: queueName ?? 'unknown',
parentContext: propagation.extract(
ROOT_CONTEXT,
extractPropagationContext(
message,
config.sqsExtractContextPropagationFromPayload
),
contextGetter
),
attributes: {
[SemanticAttributes.MESSAGING_SYSTEM]: 'aws.sqs',
[SemanticAttributes.MESSAGING_DESTINATION]: queueName,
[SemanticAttributes.MESSAGING_DESTINATION_KIND]:
MessagingDestinationKindValues.QUEUE,
[SemanticAttributes.MESSAGING_MESSAGE_ID]: message.MessageId,
[SemanticAttributes.MESSAGING_URL]: queueUrl,
[SemanticAttributes.MESSAGING_OPERATION]:
MessagingOperationValues.PROCESS,
},
}),
processHook: (span: Span, message: SQS.Message) =>
config.sqsProcessHook?.(span, { message }),
});

pubsubPropagation.patchArrayForProcessSpans(
messages,
tracer,
context.active()
);
switch (response.request.commandName) {
case 'SendMessage':
span.setAttribute(
SemanticAttributes.MESSAGING_MESSAGE_ID,
response?.data?.MessageId
);
break;

case 'SendMessageBatch':
// TODO: How should this be handled?
break;

case 'ReceiveMessage': {
const messages: SQS.Message[] = response?.data?.Messages;
if (messages) {
const queueUrl = this.extractQueueUrl(response.request.commandInput);
const queueName = this.extractQueueNameFromUrl(queueUrl);

pubsubPropagation.patchMessagesArrayToStartProcessSpans<SQS.Message>({
messages,
parentContext: trace.setSpan(context.active(), span),
tracer,
messageToSpanDetails: (message: SQS.Message) => ({
name: queueName ?? 'unknown',
parentContext: propagation.extract(
ROOT_CONTEXT,
extractPropagationContext(
message,
config.sqsExtractContextPropagationFromPayload
),
contextGetter
),
attributes: {
[SemanticAttributes.MESSAGING_SYSTEM]: 'aws.sqs',
[SemanticAttributes.MESSAGING_DESTINATION]: queueName,
[SemanticAttributes.MESSAGING_DESTINATION_KIND]:
MessagingDestinationKindValues.QUEUE,
[SemanticAttributes.MESSAGING_MESSAGE_ID]: message.MessageId,
[SemanticAttributes.MESSAGING_URL]: queueUrl,
[SemanticAttributes.MESSAGING_OPERATION]:
MessagingOperationValues.PROCESS,
},
}),
processHook: (span: Span, message: SQS.Message) =>
config.sqsProcessHook?.(span, { message }),
});

pubsubPropagation.patchArrayForProcessSpans(
messages,
tracer,
context.active()
);
}
break;
}
}
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ describe('instrumentation-aws-sdk-v3', () => {
'https://sqs.us-east-1.amazonaws.com/731241200085/otel-demo-aws-sdk',
MessageBody: 'payload example from v3 without batch',
};
await sqsClient.sendMessage(params);
const response = await sqsClient.sendMessage(params);
expect(getTestSpans().length).toBe(1);
const [span] = getTestSpans();

Expand All @@ -320,6 +320,9 @@ describe('instrumentation-aws-sdk-v3', () => {
expect(span.attributes[SemanticAttributes.MESSAGING_URL]).toEqual(
params.QueueUrl
);
expect(
span.attributes[SemanticAttributes.MESSAGING_MESSAGE_ID]
).toEqual(response.MessageId);
expect(span.attributes[SemanticAttributes.HTTP_STATUS_CODE]).toEqual(
200
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import * as AWS from 'aws-sdk';
import { AWSError } from 'aws-sdk';

import {
MessagingDestinationKindValues,
MessagingOperationValues,
SemanticAttributes,
} from '@opentelemetry/semantic-conventions';
Expand All @@ -41,11 +42,13 @@ import { Message } from 'aws-sdk/clients/sqs';
import * as expect from 'expect';
import * as sinon from 'sinon';
import * as messageAttributes from '../src/services/MessageAttributes';
import { AttributeNames } from '../src/enums';

const responseMockSuccess = {
requestId: '0000000000000',
error: null,
};
httpResponse: { statusCode: 200 },
} as AWS.Response<any, any>;

const extractContextSpy = sinon.spy(
messageAttributes,
Expand Down Expand Up @@ -361,6 +364,49 @@ describe('SQS', () => {
});

describe('hooks', () => {
it('sqsResponseHook for sendMessage should add messaging attributes', async () => {
const region = 'us-east-1';
const sqs = new AWS.SQS();
sqs.config.update({ region });

const QueueName = 'unittest';
const params = {
QueueUrl: `queue/url/for/${QueueName}`,
MessageBody: 'payload example from v2 without batch',
};

const response = await sqs.sendMessage(params).promise();

expect(getTestSpans().length).toBe(1);
const [span] = getTestSpans();

// make sure we have the general aws attributes:
expect(span.attributes[SemanticAttributes.RPC_SYSTEM]).toEqual('aws-api');
expect(span.attributes[SemanticAttributes.RPC_METHOD]).toEqual(
'SendMessage'
);
expect(span.attributes[SemanticAttributes.RPC_SERVICE]).toEqual('SQS');
expect(span.attributes[AttributeNames.AWS_REGION]).toEqual(region);

// custom messaging attributes
expect(span.attributes[SemanticAttributes.MESSAGING_SYSTEM]).toEqual(
'aws.sqs'
);
expect(
span.attributes[SemanticAttributes.MESSAGING_DESTINATION_KIND]
).toEqual(MessagingDestinationKindValues.QUEUE);
expect(span.attributes[SemanticAttributes.MESSAGING_DESTINATION]).toEqual(
QueueName
);
expect(span.attributes[SemanticAttributes.MESSAGING_URL]).toEqual(
params.QueueUrl
);
expect(span.attributes[SemanticAttributes.MESSAGING_MESSAGE_ID]).toEqual(
response.MessageId
);
expect(span.attributes[SemanticAttributes.HTTP_STATUS_CODE]).toEqual(200);
});

it('sqsProcessHook called and add message attribute to span', async () => {
const config = {
sqsProcessHook: (
Expand Down

0 comments on commit 8b36fe1

Please sign in to comment.