11package datadog .trace .instrumentation .spark ;
22
3+ import static datadog .trace .core .datastreams .TagsProcessor .CONSUMER_GROUP_TAG ;
4+ import static datadog .trace .core .datastreams .TagsProcessor .PARTITION_TAG ;
5+ import static datadog .trace .core .datastreams .TagsProcessor .TOPIC_TAG ;
6+ import static datadog .trace .core .datastreams .TagsProcessor .TYPE_TAG ;
7+
38import com .fasterxml .jackson .databind .JsonNode ;
49import com .fasterxml .jackson .databind .ObjectMapper ;
510import datadog .trace .api .Config ;
2328import java .util .Arrays ;
2429import java .util .Collection ;
2530import java .util .HashMap ;
31+ import java .util .Iterator ;
32+ import java .util .LinkedHashMap ;
2633import java .util .List ;
2734import java .util .Map ;
2835import java .util .Optional ;
5865 */
5966public abstract class AbstractDatadogSparkListener extends SparkListener {
6067 private static final Logger log = LoggerFactory .getLogger (AbstractDatadogSparkListener .class );
68+ private static final ObjectMapper objectMapper = new ObjectMapper ();
6169 public static volatile AbstractDatadogSparkListener listener = null ;
6270 public static volatile boolean finishTraceOnApplicationEnd = true ;
6371 public static volatile boolean isPysparkShell = false ;
@@ -889,6 +897,8 @@ private synchronized void onStreamingQueryProgressEvent(
889897 batchSpan .setTag (prefix + "num_input_rows" , source .numInputRows ());
890898 batchSpan .setTag (prefix + "input_rows_per_second" , source .inputRowsPerSecond ());
891899 batchSpan .setTag (prefix + "processed_rows_per_second" , source .processedRowsPerSecond ());
900+
901+ reportKafkaOffsets (batchSpan .getServiceName (), batchSpan , source );
892902 }
893903
894904 for (int i = 0 ; i < progress .stateOperators ().length ; i ++) {
@@ -1182,6 +1192,49 @@ private static String getSparkServiceName(SparkConf conf, boolean isRunningOnDat
11821192 return sparkAppName ;
11831193 }
11841194
1195+ private static void reportKafkaOffsets (
1196+ final String appName , final AgentSpan span , final SourceProgress progress ) {
1197+ if (!span .traceConfig ().isDataStreamsEnabled ()
1198+ || progress == null
1199+ || progress .description () == null ) {
1200+ return ;
1201+ }
1202+
1203+ // check if this is a kafka source
1204+ if (progress .description ().toLowerCase ().startsWith ("kafka" )) {
1205+ try {
1206+ // parse offsets from endOffsets json, reported in a format:
1207+ // "topic" -> ["partition":value]
1208+ JsonNode jsonNode = objectMapper .readTree (progress .endOffset ());
1209+ Iterator <String > topics = jsonNode .fieldNames ();
1210+ // report offsets for all topics / partitions
1211+ while (topics .hasNext ()) {
1212+ String topic = topics .next ();
1213+ JsonNode topicNode = jsonNode .get (topic );
1214+ // iterate thought reported partitions
1215+ Iterator <String > allPartitions = topicNode .fieldNames ();
1216+ // dsm tags
1217+ LinkedHashMap <String , String > sortedTags = new LinkedHashMap <>();
1218+ sortedTags .put (CONSUMER_GROUP_TAG , appName );
1219+ // will be overwritten
1220+ sortedTags .put (PARTITION_TAG , "" );
1221+ sortedTags .put (TOPIC_TAG , topic );
1222+ sortedTags .put (TYPE_TAG , "kafka_commit" );
1223+
1224+ while (allPartitions .hasNext ()) {
1225+ String partition = allPartitions .next ();
1226+ sortedTags .put (PARTITION_TAG , partition );
1227+ AgentTracer .get ()
1228+ .getDataStreamsMonitoring ()
1229+ .trackBacklog (sortedTags , topicNode .get (partition ).asLong ());
1230+ }
1231+ }
1232+ } catch (Throwable e ) {
1233+ log .debug ("Failed to parse kafka offsets" , e );
1234+ }
1235+ }
1236+ }
1237+
11851238 private static String getDatabricksRunName (SparkConf conf ) {
11861239 String allTags = conf .get ("spark.databricks.clusterUsageTags.clusterAllTags" , null );
11871240 if (allTags == null ) {
@@ -1191,7 +1244,6 @@ private static String getDatabricksRunName(SparkConf conf) {
11911244 try {
11921245 // Using the jackson JSON lib used by spark
11931246 // https://mvnrepository.com/artifact/org.apache.spark/spark-core_2.12/3.5.0
1194- ObjectMapper objectMapper = new ObjectMapper ();
11951247 JsonNode jsonNode = objectMapper .readTree (allTags );
11961248
11971249 for (JsonNode node : jsonNode ) {
0 commit comments