16
16
17
17
package org .springframework .cloud .sleuth .instrument .messaging ;
18
18
19
+ import java .lang .reflect .Type ;
19
20
import java .util .Collections ;
20
21
import java .util .List ;
21
22
import java .util .Map ;
38
39
import org .springframework .messaging .support .MessageBuilder ;
39
40
import org .springframework .messaging .support .MessageHeaderAccessor ;
40
41
41
- import reactor .core .CorePublisher ;
42
42
import reactor .core .publisher .Flux ;
43
43
import reactor .core .publisher .Mono ;
44
44
@@ -93,89 +93,188 @@ protected Object doApply(Object message, SimpleFunctionRegistry.FunctionInvocati
93
93
if (FunctionTypeUtils .isCollectionOfMessage (targetFunction .getOutputType ())) {
94
94
return targetFunction .apply (message ); // no instrumentation
95
95
}
96
- else if (targetFunction .isOutputTypePublisher ()) {
97
- return reactorStream ((Publisher <?> ) message , targetFunction );
96
+ else if (targetFunction .isInputTypePublisher () || targetFunction . isOutputTypePublisher ()) {
97
+ return reactorStream ((Publisher ) message , targetFunction );
98
98
}
99
99
return nonReactorStream ((Message <byte []>) message , targetFunction );
100
100
}
101
101
102
- private Object reactorStream (Publisher <?> message ,
102
+ private Object reactorStream (Publisher messageStream ,
103
103
SimpleFunctionRegistry .FunctionInvocationWrapper targetFunction ) {
104
- if (message == null && targetFunction .isSupplier ()) { // Supplier
105
- return reactorStreamSupplier (message , targetFunction );
106
- } else if (!targetFunction .getRawInputType ().equals (Message .class )) {
104
+ if (messageStream == null && targetFunction .isSupplier ()) { // Supplier
105
+ return reactorStreamSupplier (messageStream , targetFunction );
106
+ }
107
+ Type itemType = FunctionTypeUtils .getGenericType (targetFunction .getInputType ());
108
+ Class <?> itemTypeClass = FunctionTypeUtils .getRawType (itemType );
109
+ if (!itemTypeClass .equals (Message .class )) {
107
110
if (log .isDebugEnabled ()) {
108
- log .debug ("Target function [" + targetFunction .getFunctionDefinition () + "] has raw input type [" + targetFunction .getRawInputType () + "] and should be [" + Message .class + "]. Will not wrap it." );
109
- return targetFunction .apply (message );
111
+ log .debug ("Target function [" + targetFunction .getFunctionDefinition () + "] has raw input type ["
112
+ + itemType + "] and should be [" + Message .class + "]. Will not wrap it." );
113
+ return targetFunction .apply (messageStream );
110
114
}
111
115
}
112
- // wyciagnij wiadomosc na wejsciu i utworz child span
113
- // wsadz child span do kontekstu
114
- // powtorz to co jest w supplierze
115
- Publisher <Message > publisher = (Publisher <Message >) targetFunction .get ();
116
+ Publisher <Message > messagePublisher = messageStream ;
117
+ if (FunctionTypeUtils .isMono (targetFunction .getInputType ())) {
118
+ return reactorMonoStream (targetFunction , messagePublisher );
119
+ }
120
+ return reactorFluxStream (targetFunction , messagePublisher );
121
+ }
116
122
117
- if (publisher instanceof Mono ) {
118
- Mono <Message > mono = (Mono <Message >) publisher ;
119
- publisher = mono
120
- .map (msg -> this .traceMessageHandler .wrapInputMessage (msg , inputDestination (targetFunction .getFunctionDefinition ())))
121
- .flatMap (msg -> Mono .deferContextual (Mono ::just )
122
- .doOnNext (contextView -> {
123
- Span span = contextView .get (Span .class );
124
- Tracer .SpanInScope scope = contextView .get (Tracer .SpanInScope .class );
125
- customizedInputMessageSpan (span , msg .msg );
126
- // @formatter:off
127
- return targetFunction .apply (msg .msg )
128
- // TODO: Fix me when this is resolved in Reactor
129
- // .doOnSubscribe(__ -> scope.close())
130
- .doOnError (span ::error )
131
- .doFinally (signalType -> {
132
- span .end ();
133
- scope .close ();
134
- }));
135
- // @formatter:on
136
- })
137
- .contextWrite (context -> {
138
- return ReactorSleuth .putSpanInScope (tracer , context , msg .childSpan );
123
+ private Object reactorMonoStream (SimpleFunctionRegistry .FunctionInvocationWrapper targetFunction ,
124
+ Publisher <Message > messagePublisher ) {
125
+ if (log .isDebugEnabled ()) {
126
+ log .debug ("Will instrument a stream Mono function" );
127
+ }
128
+ Mono <Message > mono = Mono .from (messagePublisher )
129
+ // ensure there are no previous spans
130
+ .doOnNext (m -> tracer .withSpan (null ))
131
+ .map (msg -> this .traceMessageHandler .wrapInputMessage (msg ,
132
+ inputDestination (targetFunction .getFunctionDefinition ())))
133
+ .flatMap (msg -> Mono .deferContextual (ctx -> {
134
+ MessageAndSpansAndScope messageAndSpansAndScope = ctx .get (MessageAndSpansAndScope .class );
135
+ messageAndSpansAndScope .messageAndSpans = msg ;
136
+ messageAndSpansAndScope .span = msg .childSpan ;
137
+ setNameAndTag (targetFunction , msg .childSpan );
138
+ messageAndSpansAndScope .scope = tracer .withSpan (msg .childSpan );
139
+ return Mono .just (msg .msg );
140
+ }));
141
+ if (targetFunction .isConsumer ()) {
142
+ return targetFunction .apply (reactorStreamConsumer (mono ));
143
+ }
144
+ final Mono <Message > function = ((Mono <Message >) targetFunction .apply (mono ));
145
+ return Mono .deferContextual (contextView -> {
146
+ MessageAndSpansAndScope msg = contextView .get (MessageAndSpansAndScope .class );
147
+ return function .doOnNext (message -> {
148
+ msg .end ();
149
+ msg .handle ();
150
+ }).map (msgResult -> {
151
+ MessageAndSpan messageAndSpan = traceMessageHandler .wrapOutputMessage (msgResult ,
152
+ msg .messageAndSpans .parentSpan , outputDestination (targetFunction .getFunctionDefinition ()));
153
+ traceMessageHandler .afterMessageHandled (messageAndSpan .span , null );
154
+ return messageAndSpan .msg ;
155
+ })
156
+ // TODO: Fix me when this is resolved in Reactor
157
+ // .doOnSubscribe(__ -> scope.close())
158
+ .doOnError (msg ::error ).doFinally (signalType -> {
159
+ if (!msg .isHandled ()) {
160
+ msg .end ();
161
+ }
139
162
});
140
-
141
- } else {
142
- Flux flux = (Flux ) publisher ;
163
+ }).contextWrite (contextView -> contextView .put (MessageAndSpansAndScope .class , new MessageAndSpansAndScope ()));
164
+ }
143
165
166
+ private Object reactorFluxStream (SimpleFunctionRegistry .FunctionInvocationWrapper targetFunction ,
167
+ Publisher <Message > messagePublisher ) {
168
+ if (log .isDebugEnabled ()) {
169
+ log .debug ("Will instrument a stream Flux function" );
144
170
}
145
- return publisher ;
171
+ Flux <Message > flux = Flux .from (messagePublisher )
172
+ // ensure there are no previous spans
173
+ .doOnNext (m -> tracer .withSpan (null ))
174
+ .map (msg -> this .traceMessageHandler .wrapInputMessage (msg ,
175
+ inputDestination (targetFunction .getFunctionDefinition ())))
176
+ .flatMap (msg -> Flux .deferContextual (ctx -> {
177
+ MessageAndSpansAndScope messageAndSpansAndScope = ctx .get (MessageAndSpansAndScope .class );
178
+ messageAndSpansAndScope .messageAndSpans = msg ;
179
+ messageAndSpansAndScope .span = msg .childSpan ;
180
+ setNameAndTag (targetFunction , msg .childSpan );
181
+ messageAndSpansAndScope .scope = tracer .withSpan (msg .childSpan );
182
+ return Mono .just (msg .msg );
183
+ }));
184
+ if (targetFunction .isConsumer ()) {
185
+ return targetFunction .apply (reactorStreamConsumer (flux ));
186
+ }
187
+ final Flux <Message > function = ((Flux <Message >) targetFunction .apply (flux ));
188
+ return Flux .deferContextual (contextView -> {
189
+ MessageAndSpansAndScope msg = contextView .get (MessageAndSpansAndScope .class );
190
+ return function .doOnNext (message -> {
191
+ msg .end ();
192
+ msg .handle ();
193
+ }).map (msgResult -> {
194
+ MessageAndSpan messageAndSpan = traceMessageHandler .wrapOutputMessage (msgResult ,
195
+ msg .messageAndSpans .parentSpan , outputDestination (targetFunction .getFunctionDefinition ()));
196
+ traceMessageHandler .afterMessageHandled (messageAndSpan .span , null );
197
+ return messageAndSpan .msg ;
198
+ })
199
+ // TODO: Fix me when this is resolved in Reactor
200
+ // .doOnSubscribe(__ -> scope.close())
201
+ .doOnError (msg ::error ).doFinally (signalType -> {
202
+ if (!msg .isHandled ()) {
203
+ msg .end ();
204
+ }
205
+ });
206
+ }).contextWrite (contextView -> contextView .put (MessageAndSpansAndScope .class , new MessageAndSpansAndScope ()));
207
+ }
208
+
209
+ private Object reactorStreamConsumer (Object result ) {
210
+ if (result instanceof Mono ) {
211
+ return Mono .deferContextual (contextView -> {
212
+ MessageAndSpansAndScope msg = contextView .get (MessageAndSpansAndScope .class );
213
+ return ((Mono <Message >) result )
214
+ // TODO: Fix me when this is resolved in Reactor
215
+ // .doOnSubscribe(__ -> scope.close())
216
+ .doOnError (msg ::error ).doFinally (signalType -> {
217
+ msg .end ();
218
+ });
219
+ }).contextWrite (
220
+ contextView -> contextView .put (MessageAndSpansAndScope .class , new MessageAndSpansAndScope ()));
221
+ }
222
+ return Flux .deferContextual (contextView -> {
223
+ MessageAndSpansAndScope msg = contextView .get (MessageAndSpansAndScope .class );
224
+ return ((Flux <Message >) result )
225
+ // TODO: Fix me when this is resolved in Reactor
226
+ // .doOnSubscribe(__ -> scope.close())
227
+ .doOnError (msg ::error ).doFinally (signalType -> {
228
+ msg .end ();
229
+ });
230
+ }).contextWrite (contextView -> contextView .put (MessageAndSpansAndScope .class , new MessageAndSpansAndScope ()));
146
231
}
147
232
148
233
private Object reactorStreamSupplier (Publisher <?> message ,
149
234
SimpleFunctionRegistry .FunctionInvocationWrapper targetFunction ) {
150
235
Publisher <?> publisher = (Publisher <?>) targetFunction .get ();
151
236
if (publisher instanceof Mono ) {
237
+ if (log .isDebugEnabled ()) {
238
+ log .debug ("Will instrument a stream Mono supplier" );
239
+ }
152
240
Mono mono = (Mono ) publisher ;
153
241
publisher = ReactorSleuth .tracedMono (tracer , tracer .currentTraceContext (),
154
242
targetFunction .getFunctionDefinition (), () -> mono , (msg , s ) -> {
155
- customizedInputMessageSpan (s , msg instanceof Message ? (Message ) msg : null ); // (1)
243
+ customizedInputMessageSpan (s , msg instanceof Message ? (Message ) msg : null );
156
244
}).map (object -> toMessage (object ))
157
245
.map (object -> this .getMessageAndSpans ((Message ) object , targetFunction .getFunctionDefinition (),
158
- tracer .currentSpan ()))
246
+ setNameAndTag (targetFunction , tracer .currentSpan ())))
247
+ .doOnNext (wrappedOutputMessage -> customizedOutputMessageSpan (
248
+ ((MessageAndSpan ) wrappedOutputMessage ).span , ((MessageAndSpan ) wrappedOutputMessage ).msg ))
159
249
.doOnNext (wrappedOutputMessage -> traceMessageHandler
160
250
.afterMessageHandled (((MessageAndSpan ) wrappedOutputMessage ).span , null ))
161
251
.map (wrappedOutputMessage -> ((MessageAndSpan ) wrappedOutputMessage ).msg );
162
252
}
163
253
else {
254
+ if (log .isDebugEnabled ()) {
255
+ log .debug ("Will instrument a stream Flux supplier" );
256
+ }
164
257
Flux flux = (Flux ) publisher ;
165
- // (1) zaczyna
166
258
publisher = ReactorSleuth .tracedFlux (tracer , tracer .currentTraceContext (),
167
259
targetFunction .getFunctionDefinition (), () -> flux , (msg , s ) -> {
168
- customizedInputMessageSpan (s , msg instanceof Message ? (Message ) msg : null ); // (1)
260
+ customizedInputMessageSpan (s , msg instanceof Message ? (Message ) msg : null );
169
261
}).map (object -> toMessage (object ))
170
262
.map (object -> this .getMessageAndSpans ((Message ) object , targetFunction .getFunctionDefinition (),
171
- tracer .currentSpan ()))
263
+ setNameAndTag (targetFunction , tracer .currentSpan ())))
264
+ .doOnNext (wrappedOutputMessage -> customizedOutputMessageSpan (
265
+ ((MessageAndSpan ) wrappedOutputMessage ).span , ((MessageAndSpan ) wrappedOutputMessage ).msg ))
172
266
.doOnNext (wrappedOutputMessage -> traceMessageHandler
173
267
.afterMessageHandled (((MessageAndSpan ) wrappedOutputMessage ).span , null ))
174
268
.map (wrappedOutputMessage -> ((MessageAndSpan ) wrappedOutputMessage ).msg );
175
269
}
176
270
return publisher ;
177
271
}
178
272
273
+ private Span setNameAndTag (SimpleFunctionRegistry .FunctionInvocationWrapper targetFunction , Span span ) {
274
+ return span .name (targetFunction .getFunctionDefinition ()).tag (SleuthMessagingSpan .Tags .FUNCTION_NAME .getKey (),
275
+ targetFunction .getFunctionDefinition ());
276
+ }
277
+
179
278
private Object nonReactorStream (Message <byte []> message ,
180
279
SimpleFunctionRegistry .FunctionInvocationWrapper targetFunction ) {
181
280
MessageAndSpans invocationMessage = null ;
@@ -184,7 +283,7 @@ private Object nonReactorStream(Message<byte[]> message,
184
283
if (log .isDebugEnabled ()) {
185
284
log .debug ("Creating a span for a supplier" );
186
285
}
187
- span = this .tracer .nextSpan (). name ( targetFunction . getFunctionDefinition ());
286
+ span = setNameAndTag ( targetFunction , this .tracer .nextSpan ());
188
287
customizedInputMessageSpan (span , null );
189
288
}
190
289
else {
@@ -196,7 +295,7 @@ private Object nonReactorStream(Message<byte[]> message,
196
295
if (log .isDebugEnabled ()) {
197
296
log .debug ("Wrapped input msg " + invocationMessage );
198
297
}
199
- span = invocationMessage .childSpan ;
298
+ span = setNameAndTag ( targetFunction , invocationMessage .childSpan ) ;
200
299
}
201
300
Object result ;
202
301
Throwable throwable = null ;
@@ -243,6 +342,10 @@ private void customizedInputMessageSpan(Span spanToCustomize, Message<?> msg) {
243
342
this .customizers .forEach (cust -> cust .customizeInputMessageSpan (spanToCustomize , msg ));
244
343
}
245
344
345
+ private void customizedOutputMessageSpan (Span spanToCustomize , Message <?> msg ) {
346
+ this .customizers .forEach (cust -> cust .customizeOutputMessageSpan (spanToCustomize , msg ));
347
+ }
348
+
246
349
private Message <?> toMessage (Object result ) {
247
350
if (!(result instanceof Message )) {
248
351
return MessageBuilder .withPayload (result ).build ();
@@ -276,4 +379,39 @@ public void onApplicationEvent(RefreshScopeRefreshedEvent event) {
276
379
this .functionToDestinationCache .clear ();
277
380
}
278
381
382
+ static class MessageAndSpansAndScope {
383
+
384
+ MessageAndSpans messageAndSpans ;
385
+
386
+ Span span ;
387
+
388
+ Tracer .SpanInScope scope ;
389
+
390
+ boolean handled ;
391
+
392
+ void error (Throwable throwable ) {
393
+ if (this .span != null ) {
394
+ this .span .error (throwable );
395
+ }
396
+ }
397
+
398
+ void handle () {
399
+ this .handled = true ;
400
+ }
401
+
402
+ boolean isHandled () {
403
+ return this .handled ;
404
+ }
405
+
406
+ void end () {
407
+ if (this .span != null ) {
408
+ this .span .end ();
409
+ }
410
+ if (this .scope != null ) {
411
+ this .scope .close ();
412
+ }
413
+ }
414
+
415
+ }
416
+
279
417
}
0 commit comments