Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add Akka Scheduler context propagation #12373

Merged
merged 1 commit into from
Oct 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ public List<TypeInstrumentation> typeInstrumentations() {
return asList(
new AkkaDispatcherInstrumentation(),
new AkkaActorCellInstrumentation(),
new AkkaDefaultSystemMessageQueueInstrumentation());
new AkkaDefaultSystemMessageQueueInstrumentation(),
new AkkaScheduleInstrumentation());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.akkaactor;

import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;

import io.opentelemetry.context.Context;
import io.opentelemetry.javaagent.bootstrap.Java8BytecodeBridge;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;

public class AkkaScheduleInstrumentation implements TypeInstrumentation {

@Override
public ElementMatcher<TypeDescription> typeMatcher() {
return named("akka.actor.LightArrayRevolverScheduler");
}

@Override
public void transform(TypeTransformer transformer) {
transformer.applyAdviceToMethod(
named("schedule")
.and(takesArgument(0, named("scala.concurrent.duration.FiniteDuration")))
.and(takesArgument(1, named("scala.concurrent.duration.FiniteDuration")))
.and(takesArgument(2, named("java.lang.Runnable")))
.and(takesArgument(3, named("scala.concurrent.ExecutionContext"))),
AkkaScheduleInstrumentation.class.getName() + "$ScheduleAdvice");
transformer.applyAdviceToMethod(
named("scheduleOnce")
.and(takesArgument(0, named("scala.concurrent.duration.FiniteDuration")))
.and(takesArgument(1, named("java.lang.Runnable")))
.and(takesArgument(2, named("scala.concurrent.ExecutionContext"))),
AkkaScheduleInstrumentation.class.getName() + "$ScheduleOnceAdvice");
}

@SuppressWarnings("unused")
public static class ScheduleAdvice {

@Advice.OnMethodEnter(suppress = Throwable.class)
public static void enterSchedule(
@Advice.Argument(value = 2, readOnly = false) Runnable runnable) {
Context context = Java8BytecodeBridge.currentContext();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is just for my own curiosity: why getting the current context from Java8BytecodeBride?
why not 'Context.current()' directly?

Copy link
Contributor Author

@pjfanning pjfanning Oct 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is what most of the code does so I did it too. Are you a regular contributor to this lib and if so, can you explain why I should change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is instrumented with the javaagent; for example, the `Java8BytecodeBridge` that should be used
says to use Java8BytecodeBridge.currentContext() inside Advice classes.

I assume this includes here but I am a first time contributor so I will change this if any of the regular contributors say so.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's needed when the Advice is inlined into bytecode that targets a version of Java prior to Java 8

interestingly, I think it will no longer be needed after #11457 (since we will no longer be inlining our advice code)

the javadoc explains a bit more:

* A helper for accessing methods that rely on new Java 8 bytecode features such as calling a static
* interface methods. In instrumentation, we may need to call these methods in code that is inlined
* into an instrumented class, however many times the instrumented class has been compiled to a
* previous version of bytecode and so we cannot inline calls to static interface methods, as those
* were not supported prior to Java 8 and will lead to a class verification error.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is what most of the code does so I did it too. Are you a regular contributor to this lib and if so, can you explain why I should change?

i was not asking for a change.. just curious to know the reason behind. @trask explained it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@trask The existing opentelemetry-java-instrumentation code for Akka and Pekko uses Java8BytecodeBridge. Is it ok if I stick this 'style' and that all the code can be changed to stop using Java8BytecodeBridge in a future commit?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, definitely, it's totally fine to use Java8BytecodeBridge

runnable = context.wrap(runnable);
}
}

@SuppressWarnings("unused")
public static class ScheduleOnceAdvice {

@Advice.OnMethodEnter(suppress = Throwable.class)
public static void enterScheduleOnce(
@Advice.Argument(value = 1, readOnly = false) Runnable runnable) {
Context context = Java8BytecodeBridge.currentContext();
runnable = context.wrap(runnable);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.instrumentation.akkaactor

import akka.pattern.after
import io.opentelemetry.api.GlobalOpenTelemetry
import io.opentelemetry.api.trace.Span
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.Test
import scala.concurrent.{Await, Future}
import scala.concurrent.duration.DurationInt

class AkkaSchedulerTest {

@Test
def checkThatSpanWorksWithAkkaScheduledEvents(): Unit = {
val system = AkkaActors.system
implicit val executionContext = system.dispatcher
val tracer = GlobalOpenTelemetry.get.getTracer("test-tracer")
val initialSpan = tracer.spanBuilder("test").startSpan()
val scope = initialSpan.makeCurrent()
try {
val futureResult = for {
result1 <- Future {
compareSpanContexts(Span.current(), initialSpan)
1
}
_ = compareSpanContexts(Span.current(), initialSpan)
result2 <- after(200.millis, system.scheduler)(Future.successful(2))
_ = compareSpanContexts(Span.current(), initialSpan)
} yield result1 + result2
assertThat(Await.result(futureResult, 5.seconds)).isEqualTo(3)
} finally {
scope.close()
initialSpan.end()
}
}

private def compareSpanContexts(span1: Span, span2: Span): Unit = {
assertThat(span1.getSpanContext().getTraceId())
.isEqualTo(span2.getSpanContext().getTraceId())
assertThat(span1.getSpanContext().getSpanId())
.isEqualTo(span2.getSpanContext().getSpanId())
}
}
Loading