Skip to content

Commit f015cfc

Browse files
Finish pyspark-shell trace using SparkListenerApplicationEnd (#6956)
Finish pyspark-shell application span when the spark listener is receiving the SparkListenerApplicationEnd event The application span is currently finished when exiting the runMain method to capture JVM errors, however this function is not exited as expected (System.exit(0)) when executing as pyspark-shell Pyspark application can be launched using multiple method, mainly: - spark-submit script.py - python script.py The spark instrumentation is working as expected in the spark-submit case, because this is java/scala code launched behind the scene However, the python case, the listener is being injected as expected, but the spark.application span is not being closed properly because the JVM is launched as PythonGatewayServer which is exited using System.exit(0), leading to the exit advice not being called
1 parent 7cbd5a5 commit f015cfc

File tree

3 files changed

+66
-1
lines changed

3 files changed

+66
-1
lines changed

dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
public abstract class AbstractDatadogSparkListener extends SparkListener {
5656
public static volatile AbstractDatadogSparkListener listener = null;
5757
public static volatile boolean finishTraceOnApplicationEnd = true;
58+
public static volatile boolean isPysparkShell = false;
5859

5960
private final int MAX_COLLECTION_SIZE = 1000;
6061
private final int MAX_ACCUMULATOR_SIZE = 10000;

dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/AbstractSparkInstrumentation.java

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,12 @@
44
import static net.bytebuddy.matcher.ElementMatchers.isDeclaredBy;
55
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
66
import static net.bytebuddy.matcher.ElementMatchers.nameEndsWith;
7+
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
78

89
import datadog.trace.agent.tooling.Instrumenter;
910
import datadog.trace.agent.tooling.InstrumenterModule;
1011
import net.bytebuddy.asm.Advice;
12+
import org.apache.spark.deploy.SparkSubmitArguments;
1113

1214
public abstract class AbstractSparkInstrumentation extends InstrumenterModule.Tracing
1315
implements Instrumenter.ForKnownTypes {
@@ -32,6 +34,14 @@ public String[] knownMatchingTypes() {
3234

3335
@Override
3436
public void methodAdvice(MethodTransformer transformer) {
37+
// Capture spark submit arguments
38+
transformer.applyAdvice(
39+
isMethod()
40+
.and(named("prepareSubmitEnvironment"))
41+
.and(takesArgument(0, named("org.apache.spark.deploy.SparkSubmitArguments")))
42+
.and(isDeclaredBy(named("org.apache.spark.deploy.SparkSubmit"))),
43+
AbstractSparkInstrumentation.class.getName() + "$PrepareSubmitEnvAdvice");
44+
3545
// SparkSubmit class used for non YARN/Mesos environment
3646
transformer.applyAdvice(
3747
isMethod()
@@ -47,10 +57,29 @@ public void methodAdvice(MethodTransformer transformer) {
4757
AbstractSparkInstrumentation.class.getName() + "$YarnFinishAdvice");
4858
}
4959

60+
public static class PrepareSubmitEnvAdvice {
61+
@Advice.OnMethodEnter(suppress = Throwable.class)
62+
public static void enter(@Advice.Argument(0) SparkSubmitArguments submitArgs) {
63+
64+
// Using pyspark `python script.py`, spark JVM is launched as PythonGatewayServer, which is
65+
// exited using System.exit(0), leading to the exit advice not being called
66+
// https://github.com/apache/spark/blob/v3.5.1/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala#L540-L542
67+
// https://github.com/apache/spark/blob/v3.5.1/core/src/main/scala/org/apache/spark/api/python/PythonGatewayServer.scala#L74
68+
if ("pyspark-shell".equals(submitArgs.primaryResource())) {
69+
AbstractDatadogSparkListener.isPysparkShell = true;
70+
71+
// prepareSubmitEnvironment might be called before/after runMain depending on spark version
72+
AbstractDatadogSparkListener.finishTraceOnApplicationEnd = true;
73+
}
74+
}
75+
}
76+
5077
public static class RunMainAdvice {
5178
@Advice.OnMethodEnter(suppress = Throwable.class)
5279
public static void enter() {
53-
AbstractDatadogSparkListener.finishTraceOnApplicationEnd = false;
80+
if (!AbstractDatadogSparkListener.isPysparkShell) {
81+
AbstractDatadogSparkListener.finishTraceOnApplicationEnd = false;
82+
}
5483
}
5584

5685
@Advice.OnMethodExit(suppress = Throwable.class, onThrowable = Throwable.class)

dd-java-agent/instrumentation/spark/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSparkTest.groovy

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -221,6 +221,41 @@ abstract class AbstractSparkTest extends AgentTestRunner {
221221
AbstractDatadogSparkListener.finishTraceOnApplicationEnd = true
222222
}
223223

224+
def "finish pyspark span launched with python onApplicationEnd"() {
225+
setup:
226+
def sparkSession = SparkSession.builder()
227+
.config("spark.master", "local[2]")
228+
.getOrCreate()
229+
230+
try {
231+
// Generating a fake submit of pyspark-shell
232+
def sparkSubmit = new SparkSubmit()
233+
sparkSubmit.doSubmit(["--verbose", "pyspark-shell"] as String[])
234+
}
235+
catch (Exception ignored) {}
236+
sparkSession.stop()
237+
238+
expect:
239+
assert AbstractDatadogSparkListener.isPysparkShell
240+
assert AbstractDatadogSparkListener.finishTraceOnApplicationEnd
241+
242+
assertTraces(1) {
243+
trace(1) {
244+
span {
245+
operationName "spark.application"
246+
resourceName "spark.application"
247+
spanType "spark"
248+
errored true
249+
parent()
250+
}
251+
}
252+
}
253+
254+
cleanup:
255+
AbstractDatadogSparkListener.isPysparkShell = false
256+
AbstractDatadogSparkListener.finishTraceOnApplicationEnd = true
257+
}
258+
224259
def "generate databricks spans"() {
225260
setup:
226261
def sparkSession = SparkSession.builder()

0 commit comments

Comments
 (0)