Skip to content

Commit 00d352d

Browse files
Finish pyspark-shell trace using SparkListenerApplicationEnd
1 parent ae1c4c9 commit 00d352d

File tree

1 file changed

+24
-3
lines changed

1 file changed

+24
-3
lines changed

dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/AbstractSparkInstrumentation.java

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,12 @@
11
package datadog.trace.instrumentation.spark;
22

33
import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
4-
import static net.bytebuddy.matcher.ElementMatchers.isDeclaredBy;
5-
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
6-
import static net.bytebuddy.matcher.ElementMatchers.nameEndsWith;
4+
import static net.bytebuddy.matcher.ElementMatchers.*;
75

86
import datadog.trace.agent.tooling.Instrumenter;
97
import datadog.trace.agent.tooling.InstrumenterModule;
108
import net.bytebuddy.asm.Advice;
9+
import org.apache.spark.deploy.SparkSubmitArguments;
1110

1211
public abstract class AbstractSparkInstrumentation extends InstrumenterModule.Tracing
1312
implements Instrumenter.ForKnownTypes {
@@ -32,6 +31,14 @@ public String[] knownMatchingTypes() {
3231

3332
@Override
3433
public void methodAdvice(MethodTransformer transformer) {
34+
// Capture spark submit arguments
35+
transformer.applyAdvice(
36+
isMethod()
37+
.and(named("prepareSubmitEnvironment"))
38+
.and(takesArgument(0, named("org.apache.spark.deploy.SparkSubmitArguments")))
39+
.and(isDeclaredBy(named("org.apache.spark.deploy.SparkSubmit"))),
40+
AbstractSparkInstrumentation.class.getName() + "$PrepareSubmitEnvAdvice");
41+
3542
// SparkSubmit class used for non YARN/Mesos environment
3643
transformer.applyAdvice(
3744
isMethod()
@@ -47,6 +54,20 @@ public void methodAdvice(MethodTransformer transformer) {
4754
AbstractSparkInstrumentation.class.getName() + "$YarnFinishAdvice");
4855
}
4956

57+
public static class PrepareSubmitEnvAdvice {
58+
@Advice.OnMethodEnter(suppress = Throwable.class)
59+
public static void enter(@Advice.Argument(0) SparkSubmitArguments submitArgs) {
60+
61+
// Using `python script.py`, spark JVM is launched as PythonGatewayServer, which is exited
62+
// using System.exit(0), leading to the exit advice not being called
63+
// https://github.com/apache/spark/blob/v3.5.1/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala#L540-L542
64+
// https://github.com/apache/spark/blob/v3.5.1/core/src/main/scala/org/apache/spark/api/python/PythonGatewayServer.scala#L74
65+
if ("pyspark-shell".equals(submitArgs.primaryResource())) {
66+
AbstractDatadogSparkListener.finishTraceOnApplicationEnd = true;
67+
}
68+
}
69+
}
70+
5071
public static class RunMainAdvice {
5172
@Advice.OnMethodEnter(suppress = Throwable.class)
5273
public static void enter() {

0 commit comments

Comments
 (0)