1
+ package com .didispace .stream ;
2
+
3
+ import lombok .extern .slf4j .Slf4j ;
4
+ import org .springframework .beans .factory .annotation .Autowired ;
5
+ import org .springframework .boot .SpringApplication ;
6
+ import org .springframework .boot .autoconfigure .SpringBootApplication ;
7
+ import org .springframework .cloud .stream .annotation .EnableBinding ;
8
+ import org .springframework .cloud .stream .annotation .Input ;
9
+ import org .springframework .cloud .stream .annotation .Output ;
10
+ import org .springframework .cloud .stream .annotation .StreamListener ;
11
+ import org .springframework .integration .support .MessageBuilder ;
12
+ import org .springframework .messaging .MessageChannel ;
13
+ import org .springframework .messaging .SubscribableChannel ;
14
+ import org .springframework .messaging .handler .annotation .Header ;
15
+ import org .springframework .stereotype .Component ;
16
+ import org .springframework .web .bind .annotation .GetMapping ;
17
+ import org .springframework .web .bind .annotation .RequestParam ;
18
+ import org .springframework .web .bind .annotation .RestController ;
19
+
20
+
21
+ @ EnableBinding (TestApplication .TestTopic .class )
22
+ @ SpringBootApplication
23
+ public class TestApplication {
24
+
25
+ public static void main (String [] args ) {
26
+ SpringApplication .run (TestApplication .class , args );
27
+ }
28
+
29
+ @ RestController
30
+ static class TestController {
31
+
32
+ @ Autowired
33
+ private TestTopic testTopic ;
34
+
35
+ /**
36
+ * 消息生产接口
37
+ *
38
+ * @param message
39
+ * @return
40
+ */
41
+ @ GetMapping ("/sendMessage" )
42
+ public String messageWithMQ (@ RequestParam String message ) {
43
+ testTopic .output ().send (MessageBuilder .withPayload (message ).setHeader ("version" , "1.0" ).build ());
44
+ testTopic .output ().send (MessageBuilder .withPayload (message ).setHeader ("version" , "2.0" ).build ());
45
+ return "ok" ;
46
+ }
47
+
48
+ }
49
+
50
+ /**
51
+ * 消息消费逻辑
52
+ */
53
+ @ Slf4j
54
+ @ Component
55
+ static class TestListener {
56
+
57
+ @ StreamListener (value = TestTopic .INPUT , condition = "headers['version']=='1.0'" )
58
+ public void receiveV1 (String payload , @ Header ("version" ) String version ) {
59
+ log .info ("Received v1 : " + payload + ", " + version );
60
+ }
61
+
62
+ @ StreamListener (value = TestTopic .INPUT , condition = "headers['version']=='2.0'" )
63
+ public void receiveV2 (String payload , @ Header ("version" ) String version ) {
64
+ log .info ("Received v2 : " + payload + ", " + version );
65
+ }
66
+
67
+ }
68
+
69
+ interface TestTopic {
70
+
71
+ String OUTPUT = "example-topic-output" ;
72
+ String INPUT = "example-topic-input" ;
73
+
74
+ @ Output (OUTPUT )
75
+ MessageChannel output ();
76
+
77
+ @ Input (INPUT )
78
+ SubscribableChannel input ();
79
+
80
+ }
81
+
82
+ }
0 commit comments