Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AWS SDK instrumentation - DynamoDB attributes #2262

Merged
merged 7 commits into from Feb 22, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
some clean-up, swithced to AWS SDK marshalling for SdkPojos
  • Loading branch information
kuba-wu committed Feb 18, 2021
commit 06dc4e6eb4d9b27a1feb3cd9754b3b238c056da1
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ dependencies {
implementation deps.opentelemetryExtAws

library group: 'software.amazon.awssdk', name: 'aws-core', version: '2.2.0'
library group: 'software.amazon.awssdk', name: 'aws-json-protocol', version: '2.2.0'

implementation 'com.fasterxml.jackson.core:jackson-databind:2.11.2'
This conversation was marked as resolved.
Show resolved Hide resolved
testImplementation project(':instrumentation:aws-sdk:aws-sdk-2.2:testing')
testImplementation group: 'org.assertj', name: 'assertj-core', version: '3.18.1'
This conversation was marked as resolved.
Show resolved Hide resolved
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@
import static io.opentelemetry.instrumentation.awssdk.v2_2.AwsSdkRequestType.Kinesis;
import static io.opentelemetry.instrumentation.awssdk.v2_2.AwsSdkRequestType.S3;
import static io.opentelemetry.instrumentation.awssdk.v2_2.AwsSdkRequestType.SQS;
import static io.opentelemetry.instrumentation.awssdk.v2_2.FieldMapping.of;
import static io.opentelemetry.instrumentation.awssdk.v2_2.FieldMapping.request;
import static io.opentelemetry.instrumentation.awssdk.v2_2.FieldMapping.response;

import org.checkerframework.checker.nullness.qual.Nullable;
import software.amazon.awssdk.core.SdkRequest;
Expand All @@ -26,16 +27,90 @@ public enum AwsSdkRequest {
SqsRequest(SQS, "SqsRequest"),
KinesisRequest(Kinesis, "KinesisRequest"),
// specific requests
BatchGetItem(
DynamoDB,
"BatchGetItemRequest",
request("aws.dynamodb.table_names", "requestItems"),
response("aws.dynamodb.consumed_capacity", "consumedCapacity")),
BatchWriteItem(
DynamoDB,
"BatchWriteItemRequest",
request("aws.dynamodb.table_names", "requestItems"),
response("aws.dynamodb.consumed_capacity", "consumedCapacity"),
response("aws.dynamodb.item_collection_metrics", "itemCollectionMetrics")),
CreateTable(
DynamoDB,
"CreateTableRequest",
of("awssdk.global_secondary_indexes", "globalSecondaryIndexes"),
of("awssdk.local_secondary_indexes", "localSecondaryIndexes"),
of(
"awssdk.provisioned_throughput.read_capacity_units",
request("aws.dynamodb.global_secondary_indexes", "globalSecondaryIndexes"),
request("aws.dynamodb.local_secondary_indexes", "localSecondaryIndexes"),
request(
"aws.dynamodb.provisioned_throughput.read_capacity_units",
"provisionedThroughput.readCapacityUnits"),
request(
"aws.dynamodb.provisioned_throughput.write_capacity_units",
"provisionedThroughput.writeCapacityUnits")),
DeleteItem(
DynamoDB,
"DeleteItemRequest",
response("aws.dynamodb.consumed_capacity", "consumedCapacity"),
response("aws.dynamodb.item_collection_metrics", "itemCollectionMetrics")),
GetItem(
DynamoDB,
"GetItemRequest",
request("aws.dynamodb.projection_expression", "projectionExpression"),
response("aws.dynamodb.consumed_capacity", "consumedCapacity"),
request("aws.dynamodb.consistent_read", "consistentRead")),
ListTables(
DynamoDB,
"ListTablesRequest",
request("aws.dynamodb.exclusive_start_table_name", "exclusiveStartTableName"),
response("aws.dynamodb.table_count", "tableNames"),
request("aws.dynamodb.limit", "limit")),
PutItem(
DynamoDB,
"PutItemRequest",
response("aws.dynamodb.consumed_capacity", "consumedCapacity"),
response("aws.dynamodb.item_collection_metrics", "itemCollectionMetrics")),
Query(
DynamoDB,
"QueryRequest",
request("aws.dynamodb.attributes_to_get", "attributesToGet"),
request("aws.dynamodb.consistent_read", "consistentRead"),
request("aws.dynamodb.index_name", "indexName"),
request("aws.dynamodb.limit", "limit"),
request("aws.dynamodb.projection_expression", "projectionExpression"),
request("aws.dynamodb.scan_index_forward", "scanIndexForward"),
request("aws.dynamodb.select", "select"),
response("aws.dynamodb.consumed_capacity", "consumedCapacity")),
Scan(
DynamoDB,
"ScanRequest",
request("aws.dynamodb.attributes_to_get", "attributesToGet"),
request("aws.dynamodb.consistent_read", "consistentRead"),
request("aws.dynamodb.index_name", "indexName"),
request("aws.dynamodb.limit", "limit"),
request("aws.dynamodb.projection_expression", "projectionExpression"),
request("aws.dynamodb.segment", "segment"),
request("aws.dynamodb.select", "select"),
request("aws.dynamodb.total_segments", "totalSegments"),
response("aws.dynamodb.consumed_capacity", "consumedCapacity"),
response("aws.dynamodb.count", "count"),
response("aws.dynamodb.scanned_count", "scannedCount")),
UpdateItem(
DynamoDB,
"UpdateItemRequest",
response("aws.dynamodb.consumed_capacity", "consumedCapacity"),
response("aws.dynamodb.item_collection_metrics", "itemCollectionMetrics")),
UpdateTable(
DynamoDB,
"UpdateTableRequest",
request("aws.dynamodb.attribute_definitions", "attributeDefinitions"),
request("aws.dynamodb.global_secondary_index_updates", "globalSecondaryIndexUpdates"),
request(
"aws.dynamodb.provisioned_throughput.read_capacity_units",
"provisionedThroughput.readCapacityUnits"),
of(
"awssdk.provisioned_throughput.write_capacity_units",
request(
"aws.dynamodb.provisioned_throughput.write_capacity_units",
"provisionedThroughput.writeCapacityUnits"));

private final AwsSdkRequestType type;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,17 @@

package io.opentelemetry.instrumentation.awssdk.v2_2;

import static io.opentelemetry.instrumentation.awssdk.v2_2.FieldMapping.of;
import static io.opentelemetry.instrumentation.awssdk.v2_2.FieldMapping.request;

import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;

enum AwsSdkRequestType {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's more a type of AWS service being used, not a request type -- how about AwsSdkServiceType?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For me it was more of a AWS SDK request type as contains field mappings - which belong more to a request than to a service. So in other words this model a "generic request type" rather than "a service".

S3(of("aws.bucket.name", "Bucket")),
SQS(of("aws.queue.url", "QueueUrl"), of("aws.queue.name", "QueueName")),
Kinesis(of("aws.stream.name", "StreamName")),
DynamoDB(of("aws.table.name", "TableName"), of(SemanticAttributes.DB_NAME.getKey(), "TableName"));
S3(request("aws.bucket.name", "Bucket")),
SQS(request("aws.queue.url", "QueueUrl"), request("aws.queue.name", "QueueName")),
Kinesis(request("aws.stream.name", "StreamName")),
DynamoDB(
request("aws.table.name", "TableName"),
request(SemanticAttributes.DB_NAME.getKey(), "TableName"));
This conversation was marked as resolved.
Show resolved Hide resolved

private final FieldMapping[] fieldMappings;

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -5,44 +5,57 @@

package io.opentelemetry.instrumentation.awssdk.v2_2;

import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.opentelemetry.api.trace.Span;
import java.lang.reflect.Method;
import java.util.List;
import java.util.stream.Collectors;
import java.lang.invoke.MethodHandle;
import java.lang.invoke.MethodHandles;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import org.checkerframework.checker.nullness.qual.Nullable;
import software.amazon.awssdk.core.SdkPojo;
import software.amazon.awssdk.core.SdkRequest;
import software.amazon.awssdk.core.SdkResponse;

public class FieldMapper {

private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private final Serializer serializer = new Serializer();

static {
OBJECT_MAPPER.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY);
private final ClassValue<ConcurrentHashMap<String, MethodHandle>> getterCache =
new ClassValue<ConcurrentHashMap<String, MethodHandle>>() {
@Override
protected ConcurrentHashMap<String, MethodHandle> computeValue(Class<?> type) {
return new ConcurrentHashMap<>();
}
};

public void mapFields(SdkRequest sdkRequest, AwsSdkRequest request, Span span) {
mapFields(
field -> sdkRequest.getValueForField(field, Object.class).orElse(null), request, span);
}

public void mapFields(SdkResponse sdkResponse, AwsSdkRequest request, Span span) {
mapFields(
field -> sdkResponse.getValueForField(field, Object.class).orElse(null), request, span);
}

public void mapFields(AwsSdkRequest request, SdkRequest sdkRequest, Span span) {
private void mapFields(
Function<String, Object> fieldValueProvider, AwsSdkRequest request, Span span) {
for (FieldMapping fieldMapping : request.fields()) {
mapFields(fieldMapping, sdkRequest, span);
mapFields(fieldValueProvider, fieldMapping, span);
}
for (FieldMapping fieldMapping : request.type().fields()) {
mapFields(fieldMapping, sdkRequest, span);
mapFields(fieldValueProvider, fieldMapping, span);
}
}

private void mapFields(FieldMapping fieldMapping, SdkRequest sdkRequest, Span span) {
private void mapFields(
Function<String, Object> fieldValueProvider, FieldMapping fieldMapping, Span span) {
// traverse path
String[] path = fieldMapping.getFields();
Object target = sdkRequest.getValueForField(camelCase(path[0]), Object.class).orElse(null);
Object target = fieldValueProvider.apply(camelCase(path[0]));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This camelCase() call wouldn't be needed if fields in AwsSdkRequestType were already camelCased, would it?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, here's the deal - in AWS model, fields are CamelCased, but getters start with lowercase (FieldName vs fieldName). First field in a path is retrieved by getFieldValue call which takes field name, rest is retrieved with getter call. So either way we're going to transform the name, assuming we want to stay consistent with naming (avoid putting first camel cased and rest lowercased).

for (int i = 1; i < path.length && target != null; i++) {
target = next(target, path[i]);
}
if (target != null) {
String value = serialize(target);
String value = serializer.serialize(target);
if (value != null && !value.isEmpty()) {
span.setAttribute(fieldMapping.getAttribute(), value);
}
Expand All @@ -53,31 +66,23 @@ private String camelCase(String string) {
return string.substring(0, 1).toUpperCase() + string.substring(1);
}

@Nullable
private String serialize(Object target) {
if (target instanceof SdkPojo) {
try {
return OBJECT_MAPPER.writeValueAsString(target);
} catch (JsonProcessingException e) {
return null;
}
}
if (target instanceof List) {
List<Object> list = (List<Object>) target;
return list.stream().map(this::serialize).collect(Collectors.joining());
}
// simple type
return target.toString();
}

@Nullable
private Object next(Object current, String fieldName) {
try {
Method method = current.getClass().getMethod(fieldName);
return method.invoke(current);
} catch (Exception e) {
return forField(current.getClass(), fieldName).invoke(current);
} catch (Throwable t) {
// ignore
}
return null;
}

private MethodHandle forField(Class clazz, String fieldName)
throws NoSuchMethodException, IllegalAccessException {
MethodHandle methodHandle = getterCache.get(clazz).get(fieldName);
if (methodHandle == null) {
methodHandle = MethodHandles.publicLookup().unreflect(clazz.getMethod(fieldName));
getterCache.get(clazz).put(fieldName, methodHandle);
}
return methodHandle;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,22 @@
package io.opentelemetry.instrumentation.awssdk.v2_2;

class FieldMapping {
static FieldMapping of(String attribute, String fieldPath) {
return new FieldMapping(attribute, fieldPath);

enum Type {
REQUEST,
RESPONSE;
}

static FieldMapping request(String attribute, String fieldPath) {
return new FieldMapping(Type.REQUEST, attribute, fieldPath);
}

FieldMapping(String attribute, String fieldPath) {
static FieldMapping response(String attribute, String fieldPath) {
return new FieldMapping(Type.RESPONSE, attribute, fieldPath);
}

FieldMapping(Type type, String attribute, String fieldPath) {
this.type = type;
this.attribute = attribute;
this.fieldPath = fieldPath;
this.fields = fieldPath.split("\\.");
mateuszrzeszutek marked this conversation as resolved.
Show resolved Hide resolved
Expand All @@ -24,6 +35,11 @@ String[] getFields() {
return fields;
}

Type getType() {
return type;
}

private final Type type;
private final String attribute;
private final String fieldPath;
This conversation was marked as resolved.
Show resolved Hide resolved
private final String[] fields;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.instrumentation.awssdk.v2_2;

import java.io.IOException;
import java.net.URI;
import java.util.Collection;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import org.checkerframework.checker.nullness.qual.Nullable;
import software.amazon.awssdk.core.SdkPojo;
This conversation was marked as resolved.
Show resolved Hide resolved
import software.amazon.awssdk.core.client.config.SdkClientConfiguration;
import software.amazon.awssdk.core.client.config.SdkClientOption;
import software.amazon.awssdk.http.ContentStreamProvider;
import software.amazon.awssdk.http.SdkHttpFullRequest;
import software.amazon.awssdk.http.SdkHttpMethod;
import software.amazon.awssdk.protocols.core.OperationInfo;
import software.amazon.awssdk.protocols.core.ProtocolMarshaller;
import software.amazon.awssdk.protocols.json.AwsJsonProtocolFactory;
import software.amazon.awssdk.utils.IoUtils;

public class Serializer {

private final ProtocolMarshaller<SdkHttpFullRequest> protocolMarshaller;

public Serializer() {
protocolMarshaller =
AwsJsonProtocolFactory.builder()
.clientConfiguration(
SdkClientConfiguration.builder()
.option(SdkClientOption.ENDPOINT, URI.create("http://empty"))
This conversation was marked as resolved.
Show resolved Hide resolved
.build())
.build()
.createProtocolMarshaller(
OperationInfo.builder()
.hasPayloadMembers(true)
.httpMethod(SdkHttpMethod.POST)
.build());
}

@Nullable
private String serialize(SdkPojo sdkPojo) {
Optional<ContentStreamProvider> optional =
protocolMarshaller.marshall(sdkPojo).contentStreamProvider();
return optional
.map(
csp -> {
try {
return IoUtils.toUtf8String(csp.newStream());
This conversation was marked as resolved.
Show resolved Hide resolved
} catch (IOException e) {
return null;
}
})
.orElse(null);
}

private String serialize(Collection<Object> collection) {
return collection.stream().map(this::serialize).collect(Collectors.joining(","));
}

@Nullable
public String serialize(Object target) {

if (target instanceof SdkPojo) {
return serialize((SdkPojo) target);
}
if (target instanceof Collection) {
return serialize((Collection<Object>) target);
}
if (target instanceof Map) {
return serialize(((Map) target).keySet());
}
// simple type
return target.toString();
}
}
Loading