Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -107,4 +107,5 @@ Add the environment variable `JAVA_TOOL_OPTIONS` to your Lambda functions and se

- Aws SDK V1
- Aws SDK V2
- Apache HTTP Client
- Apache HTTP Client
- Apache Kafka
9 changes: 9 additions & 0 deletions findbugs/findbugs-exclude.xml
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,13 @@
<Match>
<Bug pattern="DMI_HARDCODED_ABSOLUTE_FILENAME"/>
</Match>
<Match>
<Bug pattern="EI_EXPOSE_REP2" />
</Match>
<Match>
<Bug pattern="EI_EXPOSE_REP" />
</Match>
<Match>
<Bug pattern="DLS_DEAD_LOCAL_STORE" />
</Match>
</FindBugsFilter>
9 changes: 8 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-lambda-java-events</artifactId>
<version>2.2.6</version>
<version>3.11.5</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
Expand Down Expand Up @@ -133,6 +133,13 @@
<version>2.25.45</version>
</dependency>

<!-- Kafka dependencies -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.1.0</version>
</dependency>

<!-- Tracer dependencies -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
Expand Down
85 changes: 59 additions & 26 deletions src/main/java/io/lumigo/core/SpansContainer.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,25 @@
import io.lumigo.core.utils.JsonUtils;
import io.lumigo.core.utils.SecretScrubber;
import io.lumigo.core.utils.StringUtils;
import io.lumigo.models.*;
import io.lumigo.models.HttpSpan;
import io.lumigo.models.Reportable;
import io.lumigo.models.Span;
import java.io.*;
import java.util.*;
import java.util.concurrent.Callable;
import lombok.Getter;
import org.apache.http.Header;
import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
import org.apache.http.client.methods.HttpEntityEnclosingRequestBase;
import org.apache.http.client.methods.HttpUriRequest;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.internals.ConsumerMetadata;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.clients.producer.internals.ProducerMetadata;
import org.apache.kafka.common.serialization.Serializer;
import org.pmw.tinylog.Logger;
import software.amazon.awssdk.awscore.AwsResponse;
import software.amazon.awssdk.core.SdkResponse;
Expand All @@ -41,14 +49,16 @@ public class SpansContainer {
private static final String AMZN_TRACE_ID = "_X_AMZN_TRACE_ID";
private static final String FUNCTION_SPAN_TYPE = "function";
private static final String HTTP_SPAN_TYPE = "http";
private static final SecretScrubber secretScrubber = new SecretScrubber(new EnvUtil().getEnv());
public static final String KAFKA_SPAN_TYPE = "kafka";

private Span baseSpan;
private Span startFunctionSpan;
@Getter private Span startFunctionSpan;
private Long rttDuration;
private Span endFunctionSpan;
private Reporter reporter;
private List<HttpSpan> httpSpans = new LinkedList<>();
private SecretScrubber secretScrubber = new SecretScrubber(new EnvUtil().getEnv());
@Getter private List<BaseSpan> spans = new LinkedList<>();

private static final SpansContainer ourInstance = new SpansContainer();

public static SpansContainer getInstance() {
Expand All @@ -63,14 +73,15 @@ public void clear() {
rttDuration = null;
endFunctionSpan = null;
reporter = null;
httpSpans = new LinkedList<>();
spans = new LinkedList<>();
}

private SpansContainer() {}

public void init(Map<String, String> env, Reporter reporter, Context context, Object event) {
this.clear();
this.reporter = reporter;
this.secretScrubber = new SecretScrubber(new EnvUtil().getEnv());

int javaVersion = AwsUtils.parseJavaVersion(System.getProperty("java.version"));
if (javaVersion > 11) {
Expand All @@ -81,6 +92,7 @@ public void init(Map<String, String> env, Reporter reporter, Context context, Ob
Logger.debug("awsTracerId {}", awsTracerId);

AwsUtils.TriggeredBy triggeredBy = AwsUtils.extractTriggeredByFromEvent(event);

long startTime = System.currentTimeMillis();
this.baseSpan =
Span.builder()
Expand Down Expand Up @@ -166,8 +178,7 @@ public void start() {
.build();

try {
rttDuration =
reporter.reportSpans(prepareToSend(startFunctionSpan, false), MAX_REQUEST_SIZE);
rttDuration = reporter.reportSpans(prepareToSend(startFunctionSpan), MAX_REQUEST_SIZE);
} catch (Throwable e) {
Logger.error(e, "Failed to send start span");
}
Expand Down Expand Up @@ -214,25 +225,17 @@ private void end(Span endFunctionSpan) throws IOException {
MAX_REQUEST_SIZE);
}

public Span getStartFunctionSpan() {
return startFunctionSpan;
}

public List<Reportable> getAllCollectedSpans() {
List<Reportable> spans = new LinkedList<>();
public List<BaseSpan> getAllCollectedSpans() {
List<BaseSpan> spans = new LinkedList<>();
spans.add(endFunctionSpan);
spans.addAll(httpSpans);
spans.addAll(this.spans);
return spans;
}

public Span getEndSpan() {
return endFunctionSpan;
}

public List<HttpSpan> getHttpSpans() {
return httpSpans;
}

private String getStackTrace(Throwable throwable) {
StringWriter sw = new StringWriter();
PrintWriter pw = new PrintWriter(sw, true);
Expand Down Expand Up @@ -307,7 +310,7 @@ public void addHttpSpan(Long startTime, HttpUriRequest request, HttpResponse res
response.getStatusLine().getStatusCode())
.build())
.build());
httpSpans.add(httpSpan);
this.spans.add(httpSpan);
}

public void addHttpSpan(Long startTime, Request<?> request, Response<?> response) {
Expand Down Expand Up @@ -366,7 +369,7 @@ public void addHttpSpan(Long startTime, Request<?> request, Response<?> response
.build());
AwsSdkV1ParserFactory.getParser(request.getServiceName())
.safeParse(httpSpan, request, response);
httpSpans.add(httpSpan);
this.spans.add(httpSpan);
}

public void addHttpSpan(
Expand Down Expand Up @@ -435,7 +438,37 @@ public void addHttpSpan(
executionAttributes.getAttribute(SdkExecutionAttribute.SERVICE_NAME))
.safeParse(httpSpan, context);

httpSpans.add(httpSpan);
this.spans.add(httpSpan);
}

public <K, V> void addKafkaProduceSpan(
Long startTime,
Serializer<K> keySerializer,
Serializer<V> valueSerializer,
ProducerMetadata producerMetadata,
ProducerRecord<K, V> record,
RecordMetadata recordMetadata,
Exception exception) {
this.spans.add(
KafkaSpanFactory.createProduce(
this.baseSpan,
startTime,
keySerializer,
valueSerializer,
producerMetadata,
record,
recordMetadata,
exception));
}

public void addKafkaConsumeSpan(
Long startTime,
KafkaConsumer<?, ?> consumer,
ConsumerMetadata consumerMetadata,
ConsumerRecords<?, ?> consumerRecords) {
this.spans.add(
KafkaSpanFactory.createConsume(
this.baseSpan, startTime, consumer, consumerMetadata, consumerRecords));
}

private static String extractHeaders(Map<String, String> headers) {
Expand Down Expand Up @@ -522,18 +555,18 @@ protected static <T> T callIfVerbose(Callable<T> method) {
}
}

private Reportable prepareToSend(Reportable span, boolean hasError) {
return reduceSpanSize(span.scrub(secretScrubber), hasError);
private BaseSpan prepareToSend(BaseSpan span) {
return reduceSpanSize(span.scrub(secretScrubber), false);
}

private List<Reportable> prepareToSend(List<Reportable> spans, boolean hasError) {
for (Reportable span : spans) {
private List<BaseSpan> prepareToSend(List<BaseSpan> spans, boolean hasError) {
for (BaseSpan span : spans) {
reduceSpanSize(span.scrub(secretScrubber), hasError);
}
return spans;
}

public Reportable reduceSpanSize(Reportable span, boolean hasError) {
public BaseSpan reduceSpanSize(BaseSpan span, boolean hasError) {
int maxFieldSize =
hasError
? Configuration.getInstance().maxSpanFieldSizeWhenError()
Expand Down
9 changes: 9 additions & 0 deletions src/main/java/io/lumigo/core/configuration/Configuration.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ public class Configuration {
public static final String LUMIGO_MAX_SIZE_FOR_REQUEST = "LUMIGO_MAX_SIZE_FOR_REQUEST";
public static final String LUMIGO_INSTRUMENTATION = "LUMIGO_INSTRUMENTATION";
public static final String LUMIGO_SECRET_MASKING_REGEX = "LUMIGO_SECRET_MASKING_REGEX";
public static final String LUMIGO_MAX_BATCH_MESSAGE_IDS = "LUMIGO_MAX_BATCH_MESSAGE_IDS";

private static Configuration instance;
private LumigoConfiguration inlineConf;
Expand Down Expand Up @@ -137,4 +138,12 @@ public int maxRequestSize() {
LUMIGO_MAX_SIZE_FOR_REQUEST,
envUtil.getIntegerEnv(LUMIGO_MAX_RESPONSE_SIZE, 1024 * 500));
}

public int maxBatchMessageIds() {
int value = envUtil.getIntegerEnv(LUMIGO_MAX_BATCH_MESSAGE_IDS, 20);
if (value == 0) {
value = 20;
}
return value;
}
}
27 changes: 22 additions & 5 deletions src/main/java/io/lumigo/core/instrumentation/agent/Loader.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,7 @@
import static net.bytebuddy.matcher.ElementMatchers.nameStartsWith;
import static net.bytebuddy.matcher.ElementMatchers.not;

import io.lumigo.core.instrumentation.impl.AmazonHttpClientInstrumentation;
import io.lumigo.core.instrumentation.impl.AmazonHttpClientV2Instrumentation;
import io.lumigo.core.instrumentation.impl.ApacheHttpInstrumentation;
import io.lumigo.core.instrumentation.impl.*;
import net.bytebuddy.agent.builder.AgentBuilder;
import org.pmw.tinylog.Logger;

Expand All @@ -17,6 +15,10 @@ public static void instrument(java.lang.instrument.Instrumentation inst) {
new AmazonHttpClientInstrumentation();
AmazonHttpClientV2Instrumentation amazonHttpClientV2Instrumentation =
new AmazonHttpClientV2Instrumentation();
ApacheKafkaProducerInstrumentation apacheKafkaInstrumentation =
new ApacheKafkaProducerInstrumentation();
ApacheKafkaConsumerInstrumentation apacheKafkaConsumerInstrumentation =
new ApacheKafkaConsumerInstrumentation();
AgentBuilder builder =
new AgentBuilder.Default()
.disableClassFormatChanges()
Expand All @@ -27,13 +29,28 @@ public static void instrument(java.lang.instrument.Instrumentation inst) {
.and(
not(
nameStartsWith(
"software.amazon.awssdk.core.client.builder.SdkDefaultClientBuilder"))))
AmazonHttpClientV2Instrumentation
.INSTRUMENTATION_PACKAGE_PREFIX)))
.and(
not(
nameStartsWith(
ApacheKafkaProducerInstrumentation
.INSTRUMENTATION_PACKAGE_PREFIX)))
.and(
not(
nameStartsWith(
ApacheKafkaConsumerInstrumentation
.INSTRUMENTATION_PACKAGE_PREFIX))))
.type(apacheHttpInstrumentation.getTypeMatcher())
.transform(apacheHttpInstrumentation.getTransformer())
.type(amazonHttpClientInstrumentation.getTypeMatcher())
.transform(amazonHttpClientInstrumentation.getTransformer())
.type(amazonHttpClientV2Instrumentation.getTypeMatcher())
.transform(amazonHttpClientV2Instrumentation.getTransformer());
.transform(amazonHttpClientV2Instrumentation.getTransformer())
.type(apacheKafkaInstrumentation.getTypeMatcher())
.transform(apacheKafkaInstrumentation.getTransformer())
.type(apacheKafkaConsumerInstrumentation.getTypeMatcher())
.transform(apacheKafkaConsumerInstrumentation.getTransformer());

builder.installOn(inst);
Logger.debug("Finish Instrumentation");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,13 @@
import software.amazon.awssdk.http.SdkHttpRequest;

public class AmazonHttpClientV2Instrumentation implements LumigoInstrumentationApi {

public static final String INSTRUMENTATION_PACKAGE_PREFIX =
"software.amazon.awssdk.core.client.builder.SdkDefaultClientBuilder";

@Override
public ElementMatcher<TypeDescription> getTypeMatcher() {
return named("software.amazon.awssdk.core.client.builder.SdkDefaultClientBuilder");
return named(INSTRUMENTATION_PACKAGE_PREFIX);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package io.lumigo.core.instrumentation.impl;

import static net.bytebuddy.matcher.ElementMatchers.*;

import io.lumigo.core.SpansContainer;
import io.lumigo.core.instrumentation.LumigoInstrumentationApi;
import io.lumigo.core.instrumentation.agent.Loader;
import io.lumigo.core.utils.LRUCache;
import net.bytebuddy.agent.builder.AgentBuilder;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.internals.ConsumerMetadata;
import org.pmw.tinylog.Logger;

public class ApacheKafkaConsumerInstrumentation implements LumigoInstrumentationApi {

public static final String INSTRUMENTATION_PACKAGE_PREFIX = "org.apache.kafka.clients.consumer";

@Override
public ElementMatcher<TypeDescription> getTypeMatcher() {
return named("org.apache.kafka.clients.consumer.KafkaConsumer");
}

@Override
public AgentBuilder.Transformer.ForAdvice getTransformer() {
return new AgentBuilder.Transformer.ForAdvice()
.include(Loader.class.getClassLoader())
.advice(
isMethod()
.and(isPublic())
.and(named("poll"))
.and(takesArguments(1))
.and(
returns(
named(
"org.apache.kafka.clients.consumer.ConsumerRecords"))),
ApacheKafkaConsumerAdvice.class.getName());
}

public static class ApacheKafkaConsumerAdvice {
public static final SpansContainer spansContainer = SpansContainer.getInstance();
public static final LRUCache<String, Long> startTimeMap = new LRUCache<>(1000);

@Advice.OnMethodEnter(suppress = Throwable.class)
public static void methodEnter(@Advice.FieldValue("clientId") String clientId) {
try {
startTimeMap.put(clientId, System.currentTimeMillis());
} catch (Exception e) {
Logger.error(e);
}
}

@Advice.OnMethodExit(suppress = Throwable.class)
public static void methodExit(
@Advice.This KafkaConsumer<?, ?> consumer,
@Advice.FieldValue("metadata") ConsumerMetadata metadata,
@Advice.FieldValue("clientId") String clientId,
@Advice.Return ConsumerRecords<?, ?> consumerRecords) {
try {
Logger.info("Handling kafka request {}", consumerRecords.hashCode());
spansContainer.addKafkaConsumeSpan(
startTimeMap.get(clientId), consumer, metadata, consumerRecords);
} catch (Throwable error) {
Logger.error(error, "Failed to add kafka span");
}
}
}
}
Loading