3333import java .util .LinkedHashMap ;
3434import java .util .List ;
3535import java .util .Map ;
36+ import java .util .Objects ;
3637import java .util .Optional ;
3738import java .util .Properties ;
3839import java .util .UUID ;
40+ import java .util .function .Consumer ;
41+ import java .util .stream .Collectors ;
3942import org .apache .spark .ExceptionFailure ;
4043import org .apache .spark .SparkConf ;
4144import org .apache .spark .TaskFailedReason ;
@@ -74,6 +77,7 @@ public abstract class AbstractDatadogSparkListener extends SparkListener {
7477 private final int MAX_COLLECTION_SIZE = 5000 ;
7578 private final int MAX_ACCUMULATOR_SIZE = 50000 ;
7679 private final String RUNTIME_TAGS_PREFIX = "spark.datadog.tags." ;
80+ private static final String AGENT_OL_ENDPOINT = "openlineage/api/v1/lineage" ;
7781
7882 private final SparkConf sparkConf ;
7983 private final String sparkVersion ;
@@ -123,6 +127,7 @@ public abstract class AbstractDatadogSparkListener extends SparkListener {
123127 private long availableExecutorTime = 0 ;
124128
125129 private volatile boolean applicationEnded = false ;
130+ private SparkListener openLineageSparkListener = null ;
126131
127132 public AbstractDatadogSparkListener (SparkConf sparkConf , String appId , String sparkVersion ) {
128133 tracer = AgentTracer .get ();
@@ -151,8 +156,50 @@ public AbstractDatadogSparkListener(SparkConf sparkConf, String appId, String sp
151156 finishApplication (System .currentTimeMillis (), null , 0 , null );
152157 }
153158 }));
159+ initApplicationSpanIfNotInitialized ();
160+ loadOlSparkListener ();
161+ }
162+
163+ static void setupSparkConf (SparkConf sparkConf ) {
164+ sparkConf .set ("spark.openlineage.transport.type" , "composite" );
165+ sparkConf .set ("spark.openlineage.transport.continueOnFailure" , "true" );
166+ sparkConf .set ("spark.openlineage.transport.transports.agent.type" , "http" );
167+ sparkConf .set ("spark.openlineage.transport.transports.agent.url" , getAgentHttpUrl ());
168+ sparkConf .set ("spark.openlineage.transport.transports.agent.endpoint" , AGENT_OL_ENDPOINT );
169+ sparkConf .set ("spark.openlineage.transport.transports.agent.compression" , "gzip" );
170+ }
154171
155- log .info ("Created datadog spark listener: {}" , this .getClass ().getSimpleName ());
172+ void setupTrace (SparkConf sc ) {
173+ sc .set (
174+ "spark.openlineage.run.tags" ,
175+ "_dd.trace_id:"
176+ + applicationSpan .context ().getTraceId ().toString ()
177+ + ";_dd.intake.emit_spans:false" );
178+ }
179+
180+ void loadOlSparkListener () {
181+ String className = "io.openlineage.spark.agent.OpenLineageSparkListener" ;
182+ Optional <Class > clazz = loadClass (className );
183+ if (!clazz .isPresent ()) {
184+ log .info ("OpenLineage integration is not present on the classpath" );
185+ return ;
186+ }
187+ try {
188+ setupSparkConf (sparkConf );
189+ sparkConf .set (
190+ "spark.openlineage.run.tags" ,
191+ "_dd.trace_id:"
192+ + applicationSpan .context ().getTraceId ().toString ()
193+ + ";_dd.ol_intake.emit_spans:false" );
194+
195+ openLineageSparkListener =
196+ (SparkListener )
197+ clazz .get ().getDeclaredConstructor (SparkConf .class ).newInstance (sparkConf );
198+ log .info (
199+ "Created OL spark listener: {}" , openLineageSparkListener .getClass ().getSimpleName ());
200+ } catch (Exception e ) {
201+ log .warn ("Failed to instantiate OL Spark Listener: {}" , e .toString ());
202+ }
156203 }
157204
158205 /** Resource name of the spark job. Provide an implementation based on a specific scala version */
@@ -176,6 +223,8 @@ public AbstractDatadogSparkListener(SparkConf sparkConf, String appId, String sp
176223 @ Override
177224 public synchronized void onApplicationStart (SparkListenerApplicationStart applicationStart ) {
178225 this .applicationStart = applicationStart ;
226+ initApplicationSpanIfNotInitialized ();
227+ notifyOl (this .openLineageSparkListener ::onApplicationStart , applicationStart );
179228 }
180229
181230 private void initApplicationSpanIfNotInitialized () {
@@ -233,6 +282,7 @@ public void onApplicationEnd(SparkListenerApplicationEnd applicationEnd) {
233282 log .info (
234283 "Received spark application end event, finish trace on this event: {}" ,
235284 finishTraceOnApplicationEnd );
285+ notifyOl (this .openLineageSparkListener ::onApplicationEnd , applicationEnd );
236286
237287 if (finishTraceOnApplicationEnd ) {
238288 finishApplication (applicationEnd .time (), null , 0 , null );
@@ -426,6 +476,7 @@ public synchronized void onJobStart(SparkListenerJobStart jobStart) {
426476 stageToJob .put (stageId , jobStart .jobId ());
427477 }
428478 jobSpans .put (jobStart .jobId (), jobSpan );
479+ notifyOl (this .openLineageSparkListener ::onJobStart , jobStart );
429480 }
430481
431482 @ Override
@@ -456,6 +507,7 @@ public synchronized void onJobEnd(SparkListenerJobEnd jobEnd) {
456507 if (metrics != null ) {
457508 metrics .setSpanMetrics (jobSpan );
458509 }
510+ notifyOl (this .openLineageSparkListener ::onJobEnd , jobEnd );
459511
460512 jobSpan .finish (jobEnd .time () * 1000 );
461513 }
@@ -624,6 +676,8 @@ public void onTaskEnd(SparkListenerTaskEnd taskEnd) {
624676
625677 Properties props = stageProperties .get (stageSpanKey );
626678 sendTaskSpan (stageSpan , taskEnd , props );
679+
680+ notifyOl (this .openLineageSparkListener ::onTaskEnd , taskEnd );
627681 }
628682
629683 private void sendTaskSpan (
@@ -705,6 +759,15 @@ public void onOtherEvent(SparkListenerEvent event) {
705759 updateAdaptiveSQLPlan (event );
706760 }
707761
762+ private <T extends SparkListenerEvent > void notifyOl (Consumer <T > ol , T event ) {
763+ if (this .openLineageSparkListener != null ) {
764+ log .debug ("Notifying with event `{}`" , event .getClass ().getCanonicalName ());
765+ ol .accept (event );
766+ } else {
767+ log .debug ("OpenLineageSparkListener is null" );
768+ }
769+ }
770+
708771 private static final Class <?> adaptiveExecutionUpdateClass ;
709772 private static final MethodHandle adaptiveExecutionIdMethod ;
710773 private static final MethodHandle adaptiveSparkPlanMethod ;
@@ -765,6 +828,7 @@ private synchronized void onSQLExecutionEnd(SparkListenerSQLExecutionEnd sqlEnd)
765828 if (metrics != null ) {
766829 metrics .setSpanMetrics (span );
767830 }
831+ notifyOl (this .openLineageSparkListener ::onOtherEvent , sqlEnd );
768832
769833 span .finish (sqlEnd .time () * 1000 );
770834 }
@@ -923,7 +987,7 @@ private void setDataJobsSamplingPriority(AgentSpan span) {
923987
924988 private AgentTracer .SpanBuilder buildSparkSpan (String spanName , Properties properties ) {
925989 AgentTracer .SpanBuilder builder =
926- tracer .buildSpan (spanName ).withSpanType ("spark" ).withTag ("app_id" , appId );
990+ tracer .buildSpan ("spark" , spanName ).withSpanType ("spark" ).withTag ("app_id" , appId );
927991
928992 if (databricksServiceName != null ) {
929993 builder .withServiceName (databricksServiceName );
@@ -1260,9 +1324,71 @@ private static String getDatabricksRunName(SparkConf conf) {
12601324 return null ;
12611325 }
12621326
1327+ private static String getAgentHttpUrl () {
1328+ StringBuilder sb =
1329+ new StringBuilder ("http://" )
1330+ .append (Config .get ().getAgentHost ())
1331+ .append (":" )
1332+ .append (Config .get ().getAgentPort ());
1333+ return sb .toString ();
1334+ }
1335+
12631336 @ SuppressForbidden // called at most once per spark application
12641337 private static String removeUuidFromEndOfString (String input ) {
12651338 return input .replaceAll (
12661339 "_[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}$" , "" );
12671340 }
1341+
1342+ private Optional <Class > loadClass (String className ) {
1343+ Class clazz = null ;
1344+ List <ClassLoader > availableClassloaders =
1345+ Thread .getAllStackTraces ().keySet ().stream ()
1346+ .map (Thread ::getContextClassLoader )
1347+ .filter (Objects ::nonNull )
1348+ .collect (Collectors .toList ());
1349+ try {
1350+ clazz = Class .forName (className );
1351+ } catch (Exception e ) {
1352+ log .debug ("Failed to load {} via Class.forName: {}" , className , e .toString ());
1353+ for (ClassLoader classLoader : availableClassloaders ) {
1354+ try {
1355+ clazz = classLoader .loadClass (className );
1356+ log .debug ("Loaded {} via classLoader: {}" , className , classLoader );
1357+ break ;
1358+ } catch (Exception ex ) {
1359+ log .debug (
1360+ "Failed to load {} via loadClass via ClassLoader {} - {}" ,
1361+ className ,
1362+ classLoader ,
1363+ ex .toString ());
1364+ }
1365+ try {
1366+ clazz = classLoader .getParent ().loadClass (className );
1367+ log .debug (
1368+ "Loaded {} via parent classLoader: {} for CL {}" ,
1369+ className ,
1370+ classLoader .getParent (),
1371+ classLoader );
1372+ break ;
1373+ } catch (Exception ex ) {
1374+ log .debug (
1375+ "Failed to load {} via loadClass via parent ClassLoader {} - {}" ,
1376+ className ,
1377+ classLoader .getParent (),
1378+ ex .toString ());
1379+ }
1380+ }
1381+ }
1382+ if (clazz == null ) {
1383+ try {
1384+ clazz = ClassLoader .getSystemClassLoader ().loadClass (className );
1385+ log .debug (
1386+ "Loaded {} via system classLoader: {}" , className , ClassLoader .getSystemClassLoader ());
1387+ } catch (Exception ex ) {
1388+ log .debug (
1389+ "Failed to load {} via loadClass via SystemClassLoader {}" , className , ex .toString ());
1390+ }
1391+ }
1392+ return Optional .ofNullable (clazz );
1393+ }
12681394}
0 commit comments