16
16
17
17
package org .springframework .cloud .sleuth .instrument .messaging ;
18
18
19
+ import java .time .Duration ;
19
20
import java .util .Collections ;
20
21
import java .util .List ;
22
+ import java .util .function .Consumer ;
21
23
import java .util .function .Function ;
22
24
import java .util .function .Supplier ;
23
25
24
26
import com .fasterxml .jackson .databind .ObjectMapper ;
27
+ import org .junit .jupiter .api .Disabled ;
25
28
import org .junit .jupiter .api .Test ;
26
- import org . mockito . Answers ;
27
- import org . mockito . BDDMockito ;
29
+ import reactor . core . publisher . Flux ;
30
+ import reactor . core . publisher . Mono ;
28
31
29
32
import org .springframework .cloud .function .context .FunctionRegistration ;
30
33
import org .springframework .cloud .function .context .FunctionType ;
33
36
import org .springframework .cloud .function .context .config .JsonMessageConverter ;
34
37
import org .springframework .cloud .function .json .JacksonMapper ;
35
38
import org .springframework .cloud .sleuth .Span ;
36
- import org .springframework .cloud .sleuth .TraceContext ;
37
39
import org .springframework .cloud .sleuth .Span .Builder ;
40
+ import org .springframework .cloud .sleuth .TraceContext ;
38
41
import org .springframework .cloud .sleuth .propagation .Propagator ;
39
- import org .springframework .cloud .sleuth .propagation .Propagator .Getter ;
40
- import org .springframework .cloud .sleuth .propagation .Propagator .Setter ;
41
42
import org .springframework .cloud .sleuth .tracer .SimpleTracer ;
42
43
import org .springframework .core .convert .support .DefaultConversionService ;
43
- import org .springframework .integration .support .MessageBuilder ;
44
44
import org .springframework .messaging .Message ;
45
45
import org .springframework .messaging .converter .CompositeMessageConverter ;
46
+ import org .springframework .messaging .support .MessageBuilder ;
46
47
import org .springframework .mock .env .MockEnvironment ;
47
48
48
49
import static org .assertj .core .api .Assertions .assertThat ;
49
50
import static org .assertj .core .api .BDDAssertions .then ;
50
51
51
52
class TraceFunctionAroundWrapperTests {
52
-
53
+
53
54
CompositeMessageConverter messageConverter = new CompositeMessageConverter (
54
55
Collections .singletonList (new JsonMessageConverter (new JacksonMapper (new ObjectMapper ()))));
55
-
56
+
56
57
SimpleFunctionRegistry catalog = new SimpleFunctionRegistry (new DefaultConversionService (), messageConverter ,
57
58
new JacksonMapper (new ObjectMapper ()));
58
59
59
60
SimpleTracer tracer = new SimpleTracer ();
60
-
61
+
61
62
MockEnvironment mockEnvironment = mockEnvironment ();
62
63
63
- TraceFunctionAroundWrapper wrapper = new TraceFunctionAroundWrapper (mockEnvironment , tracer , testPropagator (),
64
+ TraceFunctionAroundWrapper wrapper = new TraceFunctionAroundWrapper (mockEnvironment , tracer , testPropagator (),
64
65
new MessageHeaderPropagatorSetter (), new MessageHeaderPropagatorGetter ()) {
65
66
@ Override
66
67
MessageAndSpan getMessageAndSpans (Message <?> resultMessage , String name , Span spanFromMessage ) {
67
68
return new MessageAndSpan (resultMessage , spanFromMessage );
68
69
}
69
70
};
70
71
71
- /**
72
- * @return
73
- */
74
72
private Propagator testPropagator () {
75
73
return new Propagator () {
76
-
74
+
77
75
@ Override
78
76
public <C > void inject (TraceContext context , C carrier , Setter <C > setter ) {
79
77
setter .set (carrier , "superHeader" , "test" );
80
78
}
81
-
79
+
82
80
@ Override
83
81
public List <String > fields () {
84
82
return Collections .singletonList ("superHeader" );
85
83
}
86
-
84
+
87
85
@ Override
88
86
public <C > Builder extract (C carrier , Getter <C > getter ) {
89
- Builder builder = BDDMockito .mock (Builder .class , Answers .RETURNS_SELF );
90
- BDDMockito .given (builder .start ()).willReturn (tracer .nextSpan ());
91
- return builder ;
87
+ return tracer .spanBuilder ();
92
88
}
93
89
};
94
90
}
95
-
91
+
96
92
@ Test
97
93
void test_tracing_with_supplier () {
98
94
FunctionRegistration <Greeter > registration = new FunctionRegistration <>(new Greeter (), "greeter" )
99
95
.type (FunctionType .of (Greeter .class ));
100
96
catalog .register (registration );
101
97
FunctionInvocationWrapper function = catalog .lookup ("greeter" );
102
-
98
+
103
99
Message <?> result = (Message <?>) wrapper .apply (null , function );
104
-
100
+
105
101
assertThat (result .getPayload ()).isEqualTo ("hello" );
106
102
assertThat (tracer .getOnlySpan ().name ).isEqualTo ("greeter" );
107
103
}
108
-
104
+
109
105
@ Test
110
106
void test_tracing_with_function () {
111
- FunctionRegistration <GreeterFunction > registration = new FunctionRegistration <>(new GreeterFunction (), "greeter" )
112
- .type (FunctionType .of (GreeterFunction .class ));
107
+ FunctionRegistration <GreeterFunction > registration = new FunctionRegistration <>(new GreeterFunction (),
108
+ "greeter" ) .type (FunctionType .of (GreeterFunction .class ));
113
109
catalog .register (registration );
114
110
FunctionInvocationWrapper function = catalog .lookup ("greeter" );
115
-
116
- Message <?> result = (Message <?>) wrapper . apply ( MessageBuilder
117
- .withPayload ("hello" ).setHeader ("superHeader" , "someValue" ).build (), function );
118
-
111
+
112
+ Message <?> result = (Message <?>) wrapper
113
+ .apply ( MessageBuilder . withPayload ("hello" ).setHeader ("superHeader" , "someValue" ).build (), function );
114
+
119
115
assertThat (result .getPayload ()).isEqualTo ("HELLO" );
116
+ assertThat (tracer .spans ).hasSize (3 );
117
+ assertThat (tracer .spans .get (0 ).name ).isEqualTo ("handle" );
118
+ assertThat (tracer .spans .get (1 ).name ).isEqualTo ("greeter" );
119
+ assertThat (tracer .spans .get (2 ).name ).isEqualTo ("send" );
120
+ }
121
+
122
+ @ Test
123
+ void test_tracing_with_consumer () {
124
+ GreeterConsumer consumer = new GreeterConsumer ();
125
+ FunctionRegistration <GreeterConsumer > registration = new FunctionRegistration <>(consumer , "greeter" )
126
+ .type (FunctionType .of (GreeterConsumer .class ));
127
+ catalog .register (registration );
128
+ FunctionInvocationWrapper function = catalog .lookup ("greeter" );
129
+
130
+ wrapper .apply (MessageBuilder .withPayload ("hello" ).setHeader ("superHeader" , "someValue" ).build (), function );
131
+
132
+ assertThat (consumer .result ).isEqualTo ("HELLO" );
133
+ assertThat (tracer .spans ).hasSize (2 );
134
+ assertThat (tracer .spans .get (0 ).name ).isEqualTo ("handle" );
135
+ assertThat (tracer .spans .get (1 ).name ).isEqualTo ("greeter" );
136
+ }
137
+
138
+ @ Disabled ("TODO: don't we want to support this?" )
139
+ @ Test
140
+ void should_trace_when_reactive_mono_supplier () {
141
+ FunctionRegistration <ReactiveMonoGreeter > registration = new FunctionRegistration <>(new ReactiveMonoGreeter (),
142
+ "greeter" ).type (FunctionType .of (ReactiveMonoGreeter .class ));
143
+ catalog .register (registration );
144
+ FunctionInvocationWrapper function = catalog .lookup ("greeter" );
145
+
146
+ Message <?> result = ((Mono <Message <?>>) wrapper .apply (null , function )).block (Duration .ofSeconds (5 ));
147
+
148
+ assertThat (result .getPayload ()).isEqualTo ("hello" );
120
149
assertThat (tracer .getOnlySpan ().name ).isEqualTo ("greeter" );
121
150
}
122
151
152
+ @ Test
153
+ void should_trace_when_reactive_mono_function () {
154
+ FunctionRegistration <ReactiveMonoGreeterFunction > registration = new FunctionRegistration <>(
155
+ new ReactiveMonoGreeterFunction (), "greeter" ).type (FunctionType .of (ReactiveMonoGreeterFunction .class ));
156
+ catalog .register (registration );
157
+ FunctionInvocationWrapper function = catalog .lookup ("greeter" );
158
+
159
+ Message <?> result = ((Mono <Message <?>>) wrapper .apply (
160
+ Mono .just (MessageBuilder .withPayload ("hello" ).setHeader ("superHeader" , "someValue" ).build ()), function ))
161
+ .block (Duration .ofSeconds (5 ));
162
+
163
+ assertThat (result .getPayload ()).isEqualTo ("HELLO" );
164
+ assertThat (tracer .spans ).hasSize (3 );
165
+ assertThat (tracer .spans .get (0 ).name ).isEqualTo ("handle" );
166
+ assertThat (tracer .spans .get (1 ).name ).isEqualTo ("greeter" );
167
+ assertThat (tracer .spans .get (2 ).name ).isEqualTo ("send" );
168
+ }
169
+
170
+ @ Disabled ("TODO: don't we want to support this?" )
171
+ @ Test
172
+ void should_trace_when_reactive_mono_consumer () {
173
+ ReactiveMonoGreeterConsumer consumer = new ReactiveMonoGreeterConsumer ();
174
+ FunctionRegistration <ReactiveMonoGreeterConsumer > registration = new FunctionRegistration <>(consumer , "greeter" )
175
+ .type (FunctionType .of (ReactiveMonoGreeterConsumer .class ));
176
+ catalog .register (registration );
177
+ FunctionInvocationWrapper function = catalog .lookup ("greeter" );
178
+
179
+ wrapper .apply (Mono .just (MessageBuilder .withPayload ("hello" ).setHeader ("superHeader" , "someValue" ).build ()),
180
+ function );
181
+
182
+ assertThat (consumer .result ).isEqualTo ("HELLO" );
183
+ assertThat (tracer .spans ).hasSize (2 );
184
+ assertThat (tracer .spans .get (0 ).name ).isEqualTo ("handle" );
185
+ assertThat (tracer .spans .get (1 ).name ).isEqualTo ("greeter" );
186
+ }
187
+
188
+ @ Disabled ("TODO: don't we want to support this?" )
189
+ @ Test
190
+ void should_trace_when_reactive_flux_supplier () {
191
+ FunctionRegistration <ReactiveFluxGreeter > registration = new FunctionRegistration <>(new ReactiveFluxGreeter (),
192
+ "greeter" ).type (FunctionType .of (ReactiveFluxGreeter .class ));
193
+ catalog .register (registration );
194
+ FunctionInvocationWrapper function = catalog .lookup ("greeter" );
195
+
196
+ Message <?> result = ((Flux <Message <?>>) wrapper .apply (null , function )).blockFirst (Duration .ofSeconds (5 ));
197
+
198
+ assertThat (result .getPayload ()).isEqualTo ("hello" );
199
+ assertThat (tracer .getOnlySpan ().name ).isEqualTo ("greeter" );
200
+ }
201
+
202
+ @ Test
203
+ void should_trace_when_reactive_flux_function () {
204
+ FunctionRegistration <ReactiveFluxGreeterFunction > registration = new FunctionRegistration <>(
205
+ new ReactiveFluxGreeterFunction (), "greeter" ).type (FunctionType .of (ReactiveFluxGreeterFunction .class ));
206
+ catalog .register (registration );
207
+ FunctionInvocationWrapper function = catalog .lookup ("greeter" );
208
+
209
+ Message <?> result = ((Flux <Message <?>>) wrapper .apply (
210
+ Flux .just (MessageBuilder .withPayload ("hello" ).setHeader ("superHeader" , "someValue" ).build ()), function ))
211
+ .blockFirst (Duration .ofSeconds (5 ));
212
+
213
+ assertThat (result .getPayload ()).isEqualTo ("HELLO" );
214
+ assertThat (tracer .spans ).hasSize (3 );
215
+ assertThat (tracer .spans .get (0 ).name ).isEqualTo ("handle" );
216
+ assertThat (tracer .spans .get (1 ).name ).isEqualTo ("greeter" );
217
+ assertThat (tracer .spans .get (2 ).name ).isEqualTo ("send" );
218
+ }
219
+
220
+ @ Test
221
+ void should_trace_when_reactive_flux_consumer () {
222
+ ReactiveFluxGreeterConsumer consumer = new ReactiveFluxGreeterConsumer ();
223
+ FunctionRegistration <ReactiveFluxGreeterConsumer > registration = new FunctionRegistration <>(consumer , "greeter" )
224
+ .type (FunctionType .of (ReactiveFluxGreeterConsumer .class ));
225
+ catalog .register (registration );
226
+ FunctionInvocationWrapper function = catalog .lookup ("greeter" );
227
+
228
+ wrapper .apply (Flux .just (MessageBuilder .withPayload ("hello" ).setHeader ("superHeader" , "someValue" ).build ()),
229
+ function );
230
+
231
+ assertThat (consumer .result ).isEqualTo ("HELLO" );
232
+ assertThat (tracer .spans ).hasSize (2 );
233
+ assertThat (tracer .spans .get (0 ).name ).isEqualTo ("handle" );
234
+ assertThat (tracer .spans .get (1 ).name ).isEqualTo ("greeter" );
235
+ }
236
+
123
237
@ Test
124
238
void should_clear_cache_on_refresh () {
125
239
TraceFunctionAroundWrapper wrapper = new TraceFunctionAroundWrapper (null , null , null , null , null );
@@ -160,7 +274,7 @@ void should_point_to_proper_destination_when_working_with_remapped_functions() {
160
274
161
275
assertThat (wrapper .outputDestination ("marcin" )).isEqualTo ("bob" );
162
276
}
163
-
277
+
164
278
private MockEnvironment mockEnvironment () {
165
279
MockEnvironment mockEnvironment = new MockEnvironment ();
166
280
mockEnvironment .setProperty ("spring.cloud.stream.bindings.greeter-in-0.destination" , "oleg" );
@@ -176,14 +290,87 @@ public String get() {
176
290
}
177
291
178
292
}
179
-
293
+
180
294
private static class GreeterFunction implements Function <String , String > {
181
-
295
+
182
296
@ Override
183
297
public String apply (String in ) {
184
298
return in .toUpperCase ();
185
299
}
186
-
300
+
301
+ }
302
+
303
+ private static class GreeterConsumer implements Consumer <String > {
304
+
305
+ String result ;
306
+
307
+ @ Override
308
+ public void accept (String in ) {
309
+ this .result = in .toUpperCase ();
310
+ }
311
+
312
+ }
313
+
314
+ private static class ReactiveMonoGreeter implements Supplier <Mono <Message <String >>> {
315
+
316
+ @ Override
317
+ public Mono <Message <String >> get () {
318
+ return Mono .just (MessageBuilder .withPayload ("hello" ).build ());
319
+ }
320
+
321
+ }
322
+
323
+ private static class ReactiveMonoGreeterFunction implements Function <Mono <Message <String >>, Mono <Message <String >>> {
324
+
325
+ @ Override
326
+ public Mono <Message <String >> apply (Mono <Message <String >> in ) {
327
+ return in .map (s -> MessageBuilder .fromMessage (s ).withPayload (s .getPayload ().toUpperCase ()).build ());
328
+ }
329
+
330
+ }
331
+
332
+ private static class ReactiveMonoGreeterConsumer implements Consumer <Mono <Message <String >>> {
333
+
334
+ String result ;
335
+
336
+ @ Override
337
+ public void accept (Mono <Message <String >> in ) {
338
+ in .map (s -> s .getPayload ().toUpperCase ()).doOnNext (s -> {
339
+ result = s ;
340
+ }).subscribe ();
341
+ }
342
+
343
+ }
344
+
345
+ private static class ReactiveFluxGreeter implements Supplier <Flux <Message <String >>> {
346
+
347
+ @ Override
348
+ public Flux <Message <String >> get () {
349
+ return Flux .just (MessageBuilder .withPayload ("hello" ).build ());
350
+ }
351
+
352
+ }
353
+
354
+ private static class ReactiveFluxGreeterFunction implements Function <Flux <Message <String >>, Flux <Message <String >>> {
355
+
356
+ @ Override
357
+ public Flux <Message <String >> apply (Flux <Message <String >> in ) {
358
+ return in .map (s -> MessageBuilder .fromMessage (s ).withPayload (s .getPayload ().toUpperCase ()).build ());
359
+ }
360
+
361
+ }
362
+
363
+ private static class ReactiveFluxGreeterConsumer implements Consumer <Flux <Message <String >>> {
364
+
365
+ String result ;
366
+
367
+ @ Override
368
+ public void accept (Flux <Message <String >> in ) {
369
+ in .map (s -> s .getPayload ().toUpperCase ()).doOnNext (s -> {
370
+ result = s ;
371
+ }).subscribe ();
372
+ }
373
+
187
374
}
188
375
189
376
}
0 commit comments