1313import io .lumigo .core .utils .JsonUtils ;
1414import io .lumigo .core .utils .SecretScrubber ;
1515import io .lumigo .core .utils .StringUtils ;
16+ import io .lumigo .models .*;
1617import io .lumigo .models .HttpSpan ;
17- import io .lumigo .models .Reportable ;
1818import io .lumigo .models .Span ;
1919import java .io .*;
2020import java .util .*;
2121import java .util .concurrent .Callable ;
22+ import lombok .Getter ;
2223import org .apache .http .Header ;
2324import org .apache .http .HttpEntity ;
2425import org .apache .http .HttpResponse ;
2526import org .apache .http .client .methods .HttpEntityEnclosingRequestBase ;
2627import org .apache .http .client .methods .HttpUriRequest ;
28+ import org .apache .kafka .clients .consumer .ConsumerRecords ;
29+ import org .apache .kafka .clients .consumer .KafkaConsumer ;
30+ import org .apache .kafka .clients .consumer .internals .ConsumerMetadata ;
31+ import org .apache .kafka .clients .producer .ProducerRecord ;
32+ import org .apache .kafka .clients .producer .RecordMetadata ;
33+ import org .apache .kafka .clients .producer .internals .ProducerMetadata ;
34+ import org .apache .kafka .common .serialization .Serializer ;
2735import org .pmw .tinylog .Logger ;
2836import software .amazon .awssdk .awscore .AwsResponse ;
2937import software .amazon .awssdk .core .SdkResponse ;
@@ -41,14 +49,16 @@ public class SpansContainer {
4149 private static final String AMZN_TRACE_ID = "_X_AMZN_TRACE_ID" ;
4250 private static final String FUNCTION_SPAN_TYPE = "function" ;
4351 private static final String HTTP_SPAN_TYPE = "http" ;
44- private static final SecretScrubber secretScrubber = new SecretScrubber ( new EnvUtil (). getEnv ()) ;
52+ public static final String KAFKA_SPAN_TYPE = "kafka" ;
4553
4654 private Span baseSpan ;
47- private Span startFunctionSpan ;
55+ @ Getter private Span startFunctionSpan ;
4856 private Long rttDuration ;
4957 private Span endFunctionSpan ;
5058 private Reporter reporter ;
51- private List <HttpSpan > httpSpans = new LinkedList <>();
59+ private SecretScrubber secretScrubber = new SecretScrubber (new EnvUtil ().getEnv ());
60+ @ Getter private List <BaseSpan > spans = new LinkedList <>();
61+
5262 private static final SpansContainer ourInstance = new SpansContainer ();
5363
5464 public static SpansContainer getInstance () {
@@ -63,14 +73,15 @@ public void clear() {
6373 rttDuration = null ;
6474 endFunctionSpan = null ;
6575 reporter = null ;
66- httpSpans = new LinkedList <>();
76+ spans = new LinkedList <>();
6777 }
6878
6979 private SpansContainer () {}
7080
7181 public void init (Map <String , String > env , Reporter reporter , Context context , Object event ) {
7282 this .clear ();
7383 this .reporter = reporter ;
84+ this .secretScrubber = new SecretScrubber (new EnvUtil ().getEnv ());
7485
7586 int javaVersion = AwsUtils .parseJavaVersion (System .getProperty ("java.version" ));
7687 if (javaVersion > 11 ) {
@@ -81,6 +92,7 @@ public void init(Map<String, String> env, Reporter reporter, Context context, Ob
8192 Logger .debug ("awsTracerId {}" , awsTracerId );
8293
8394 AwsUtils .TriggeredBy triggeredBy = AwsUtils .extractTriggeredByFromEvent (event );
95+
8496 long startTime = System .currentTimeMillis ();
8597 this .baseSpan =
8698 Span .builder ()
@@ -166,8 +178,7 @@ public void start() {
166178 .build ();
167179
168180 try {
169- rttDuration =
170- reporter .reportSpans (prepareToSend (startFunctionSpan , false ), MAX_REQUEST_SIZE );
181+ rttDuration = reporter .reportSpans (prepareToSend (startFunctionSpan ), MAX_REQUEST_SIZE );
171182 } catch (Throwable e ) {
172183 Logger .error (e , "Failed to send start span" );
173184 }
@@ -214,25 +225,17 @@ private void end(Span endFunctionSpan) throws IOException {
214225 MAX_REQUEST_SIZE );
215226 }
216227
217- public Span getStartFunctionSpan () {
218- return startFunctionSpan ;
219- }
220-
221- public List <Reportable > getAllCollectedSpans () {
222- List <Reportable > spans = new LinkedList <>();
228+ public List <BaseSpan > getAllCollectedSpans () {
229+ List <BaseSpan > spans = new LinkedList <>();
223230 spans .add (endFunctionSpan );
224- spans .addAll (httpSpans );
231+ spans .addAll (this . spans );
225232 return spans ;
226233 }
227234
228235 public Span getEndSpan () {
229236 return endFunctionSpan ;
230237 }
231238
232- public List <HttpSpan > getHttpSpans () {
233- return httpSpans ;
234- }
235-
236239 private String getStackTrace (Throwable throwable ) {
237240 StringWriter sw = new StringWriter ();
238241 PrintWriter pw = new PrintWriter (sw , true );
@@ -307,7 +310,7 @@ public void addHttpSpan(Long startTime, HttpUriRequest request, HttpResponse res
307310 response .getStatusLine ().getStatusCode ())
308311 .build ())
309312 .build ());
310- httpSpans .add (httpSpan );
313+ this . spans .add (httpSpan );
311314 }
312315
313316 public void addHttpSpan (Long startTime , Request <?> request , Response <?> response ) {
@@ -366,7 +369,7 @@ public void addHttpSpan(Long startTime, Request<?> request, Response<?> response
366369 .build ());
367370 AwsSdkV1ParserFactory .getParser (request .getServiceName ())
368371 .safeParse (httpSpan , request , response );
369- httpSpans .add (httpSpan );
372+ this . spans .add (httpSpan );
370373 }
371374
372375 public void addHttpSpan (
@@ -435,7 +438,37 @@ public void addHttpSpan(
435438 executionAttributes .getAttribute (SdkExecutionAttribute .SERVICE_NAME ))
436439 .safeParse (httpSpan , context );
437440
438- httpSpans .add (httpSpan );
441+ this .spans .add (httpSpan );
442+ }
443+
444+ public <K , V > void addKafkaProduceSpan (
445+ Long startTime ,
446+ Serializer <K > keySerializer ,
447+ Serializer <V > valueSerializer ,
448+ ProducerMetadata producerMetadata ,
449+ ProducerRecord <K , V > record ,
450+ RecordMetadata recordMetadata ,
451+ Exception exception ) {
452+ this .spans .add (
453+ KafkaSpanFactory .createProduce (
454+ this .baseSpan ,
455+ startTime ,
456+ keySerializer ,
457+ valueSerializer ,
458+ producerMetadata ,
459+ record ,
460+ recordMetadata ,
461+ exception ));
462+ }
463+
464+ public void addKafkaConsumeSpan (
465+ Long startTime ,
466+ KafkaConsumer <?, ?> consumer ,
467+ ConsumerMetadata consumerMetadata ,
468+ ConsumerRecords <?, ?> consumerRecords ) {
469+ this .spans .add (
470+ KafkaSpanFactory .createConsume (
471+ this .baseSpan , startTime , consumer , consumerMetadata , consumerRecords ));
439472 }
440473
441474 private static String extractHeaders (Map <String , String > headers ) {
@@ -522,18 +555,18 @@ protected static <T> T callIfVerbose(Callable<T> method) {
522555 }
523556 }
524557
525- private Reportable prepareToSend (Reportable span , boolean hasError ) {
526- return reduceSpanSize (span .scrub (secretScrubber ), hasError );
558+ private BaseSpan prepareToSend (BaseSpan span ) {
559+ return reduceSpanSize (span .scrub (secretScrubber ), false );
527560 }
528561
529- private List <Reportable > prepareToSend (List <Reportable > spans , boolean hasError ) {
530- for (Reportable span : spans ) {
562+ private List <BaseSpan > prepareToSend (List <BaseSpan > spans , boolean hasError ) {
563+ for (BaseSpan span : spans ) {
531564 reduceSpanSize (span .scrub (secretScrubber ), hasError );
532565 }
533566 return spans ;
534567 }
535568
536- public Reportable reduceSpanSize (Reportable span , boolean hasError ) {
569+ public BaseSpan reduceSpanSize (BaseSpan span , boolean hasError ) {
537570 int maxFieldSize =
538571 hasError
539572 ? Configuration .getInstance ().maxSpanFieldSizeWhenError ()
0 commit comments