Skip to content

Commit a5dd994

Browse files
committed
Mend Spring Cloud Stream configuration
1 parent a3ee002 commit a5dd994

File tree

28 files changed

+518
-309
lines changed

28 files changed

+518
-309
lines changed

10/part1/comments/src/main/java/com/greglturnquist/learningspringboot/comments/CommentService.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,19 +20,19 @@
2020
import org.springframework.boot.CommandLineRunner;
2121
import org.springframework.boot.actuate.metrics.CounterService;
2222
import org.springframework.cloud.stream.annotation.EnableBinding;
23+
import org.springframework.cloud.stream.annotation.Input;
24+
import org.springframework.cloud.stream.annotation.Output;
2325
import org.springframework.cloud.stream.annotation.StreamListener;
24-
import org.springframework.cloud.stream.messaging.Sink;
25-
import org.springframework.cloud.stream.messaging.Source;
26+
import org.springframework.cloud.stream.messaging.Processor;
2627
import org.springframework.context.annotation.Bean;
27-
import org.springframework.messaging.handler.annotation.SendTo;
2828
import org.springframework.stereotype.Service;
2929

3030
/**
3131
* @author Greg Turnquist
3232
*/
3333
// tag::stream-1[]
3434
@Service
35-
@EnableBinding({Sink.class, Source.class})
35+
@EnableBinding(Processor.class)
3636
public class CommentService {
3737
// end::stream-1[]
3838

@@ -47,9 +47,9 @@ public CommentService(CommentRepository repository,
4747
}
4848

4949
// tag::stream-2[]
50-
@StreamListener(Sink.INPUT)
51-
@SendTo(Source.OUTPUT)
52-
public Flux<Comment> save(Flux<Comment> newComment) {
50+
@StreamListener
51+
@Output(Processor.OUTPUT)
52+
public Flux<Comment> save(@Input(Processor.INPUT) Flux<Comment> newComment) {
5353
return repository
5454
.saveAll(newComment)
5555
.map(comment -> {

10/part1/images/src/main/java/com/greglturnquist/learningspringboot/CommentSimulator.java

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
import java.util.concurrent.atomic.AtomicInteger;
1919

20+
import reactor.core.publisher.Mono;
21+
2022
import org.springframework.context.annotation.Profile;
2123
import org.springframework.scheduling.annotation.Scheduled;
2224
import org.springframework.stereotype.Component;
@@ -51,13 +53,16 @@ public CommentSimulator(HomeController homeController,
5153

5254
@Scheduled(fixedRate = 100)
5355
public void simulateActivity() {
54-
repository.findAll().map(image -> {
55-
Comment comment = new Comment();
56-
comment.setImageId(image.getId());
57-
comment.setComment(
58-
"Comment #" + counter.getAndIncrement());
59-
return commentController.addComment(comment);
60-
})
56+
repository
57+
.findAll()
58+
.map(image -> {
59+
Comment comment = new Comment();
60+
comment.setImageId(image.getId());
61+
comment.setComment(
62+
"Comment #" + counter.getAndIncrement());
63+
return Mono.just(comment);
64+
})
65+
.map(commentController::addComment)
6166
.subscribe();
6267
}
6368

10/part1/images/src/main/java/com/greglturnquist/learningspringboot/images/CommentController.java

Lines changed: 40 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,19 @@
1515
*/
1616
package com.greglturnquist.learningspringboot.images;
1717

18-
import org.slf4j.Logger;
19-
import org.slf4j.LoggerFactory;
18+
import reactor.core.publisher.Flux;
19+
import reactor.core.publisher.FluxSink;
2020
import reactor.core.publisher.Mono;
2121

2222
import org.springframework.boot.actuate.metrics.CounterService;
2323
import org.springframework.cloud.stream.annotation.EnableBinding;
24+
import org.springframework.cloud.stream.annotation.Output;
2425
import org.springframework.cloud.stream.messaging.Source;
26+
import org.springframework.cloud.stream.reactive.FluxSender;
27+
import org.springframework.cloud.stream.reactive.StreamEmitter;
2528
import org.springframework.http.MediaType;
2629
import org.springframework.http.ResponseEntity;
30+
import org.springframework.messaging.Message;
2731
import org.springframework.messaging.MessageHeaders;
2832
import org.springframework.messaging.support.MessageBuilder;
2933
import org.springframework.web.bind.annotation.PostMapping;
@@ -32,38 +36,50 @@
3236
/**
3337
* @author Greg Turnquist
3438
*/
35-
// tag::code[]
3639
@RestController
3740
@EnableBinding(Source.class)
3841
public class CommentController {
3942

40-
private final static Logger log = LoggerFactory.getLogger(CommentController.class);
41-
42-
private final Source source;
43-
4443
private final CounterService counterService;
44+
private FluxSink<Message<Comment>> commentSink;
45+
private Flux<Message<Comment>> flux;
4546

46-
public CommentController(Source source,
47-
CounterService counterService) {
48-
this.source = source;
47+
public CommentController(CounterService counterService) {
4948
this.counterService = counterService;
49+
this.flux = Flux.<Message<Comment>>create(
50+
emitter -> this.commentSink = emitter,
51+
FluxSink.OverflowStrategy.IGNORE)
52+
.publish()
53+
.autoConnect();
5054
}
5155

5256
@PostMapping("/comments")
53-
public Mono<ResponseEntity<?>> addComment(Comment newComment) {
54-
return Mono.fromRunnable(() -> source.output().send(
55-
MessageBuilder
56-
.withPayload(newComment)
57-
.setHeader(MessageHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
58-
.build())
59-
)
60-
.map(aVoid -> {
61-
counterService.increment("comments.total.produced");
62-
counterService.increment(
63-
"comments." + newComment.getImageId() + ".produced");
64-
return ResponseEntity.noContent().build();
65-
});
57+
public Mono<ResponseEntity<?>> addComment(Mono<Comment> newComment) {
58+
if (commentSink != null) {
59+
return newComment
60+
.map(comment -> {
61+
commentSink.next(MessageBuilder
62+
.withPayload(comment)
63+
.setHeader(MessageHeaders.CONTENT_TYPE,
64+
MediaType.APPLICATION_JSON_VALUE)
65+
.build());
66+
return comment;
67+
})
68+
.flatMap(comment -> {
69+
counterService.increment("comments.total.produced");
70+
counterService.increment(
71+
"comments." + comment.getImageId() + ".produced");
72+
return Mono.just(ResponseEntity.noContent().build());
73+
});
74+
} else {
75+
return Mono.just(ResponseEntity.noContent().build());
76+
}
77+
}
78+
79+
@StreamEmitter
80+
@Output(Source.OUTPUT)
81+
public void emit(FluxSender output) {
82+
output.send(this.flux);
6683
}
6784

6885
}
69-
// end::code[]

6/part2/src/main/java/com/greglturnquist/learningspringboot/CommentSimulator.java

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -49,14 +49,17 @@ public CommentSimulator(CommentController controller,
4949

5050
@Scheduled(fixedRate = 100)
5151
public void simulateActivity() {
52-
repository.findAll().map(image -> {
53-
Comment comment = new Comment();
54-
comment.setImageId(image.getId());
55-
comment.setComment(
56-
"Comment #" + counter.getAndIncrement());
57-
return controller.addComment(Mono.just(comment));
58-
})
59-
.subscribe();
52+
repository
53+
.findAll()
54+
.map(image -> {
55+
Comment comment = new Comment();
56+
comment.setImageId(image.getId());
57+
comment.setComment(
58+
"Comment #" + counter.getAndIncrement());
59+
return Mono.just(comment);
60+
})
61+
.map(controller::addComment)
62+
.subscribe();
6063
}
6164
}
6265
// end::tag[]

6/part3/src/main/java/com/greglturnquist/learningspringboot/CommentSimulator.java

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
import java.util.concurrent.atomic.AtomicInteger;
1919

20+
import reactor.core.publisher.Mono;
21+
2022
import org.springframework.context.annotation.Profile;
2123
import org.springframework.scheduling.annotation.Scheduled;
2224
import org.springframework.stereotype.Component;
@@ -47,14 +49,17 @@ public CommentSimulator(CommentController controller,
4749

4850
@Scheduled(fixedRate = 100)
4951
public void simulateActivity() {
50-
repository.findAll().map(image -> {
51-
Comment comment = new Comment();
52-
comment.setImageId(image.getId());
53-
comment.setComment(
54-
"Comment #" + counter.getAndIncrement());
55-
return controller.addComment(comment);
56-
})
57-
.subscribe();
52+
repository
53+
.findAll()
54+
.map(image -> {
55+
Comment comment = new Comment();
56+
comment.setImageId(image.getId());
57+
comment.setComment(
58+
"Comment #" + counter.getAndIncrement());
59+
return Mono.just(comment);
60+
})
61+
.map(controller::addComment)
62+
.subscribe();
5863
}
5964
}
6065
// end::tag[]

6/part3/src/main/java/com/greglturnquist/learningspringboot/comments/CommentController.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -49,15 +49,15 @@ public CommentController(CounterService counterService) {
4949
FluxSink.OverflowStrategy.IGNORE)
5050
.publish()
5151
.autoConnect();
52-
5352
}
5453

5554
@PostMapping("/comments")
56-
public Mono<String> addComment(Comment newComment) {
55+
public Mono<String> addComment(Mono<Comment> newComment) {
5756
if (commentSink != null) {
58-
return Mono.fromRunnable(() -> commentSink.next(MessageBuilder
59-
.withPayload(newComment)
60-
.build()))
57+
return newComment
58+
.map(comment -> commentSink.next(MessageBuilder
59+
.withPayload(comment)
60+
.build()))
6161
.then(Mono.just("redirect:/"));
6262
} else {
6363
return Mono.just("redirect:/");

6/part3/src/main/java/com/greglturnquist/learningspringboot/comments/CommentService.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -50,17 +50,16 @@ public CommentService(CommentWriterRepository repository,
5050
// tag::stream-2[]
5151
@StreamListener
5252
@Output(Processor.OUTPUT)
53-
public Mono<Void> save(@Input(Processor.INPUT) Flux<Comment> newComment) {
53+
public Flux<Void> save(@Input(Processor.INPUT) Flux<Comment> newComment) {
5454
return repository
5555
.saveAll(newComment)
56-
.map(comment -> {
56+
.flatMap(comment -> {
5757
counterService.increment(
5858
"comments.total.consumed");
5959
counterService.increment(
6060
"comments." + comment.getImageId() + ".consumed");
61-
return comment;
62-
})
63-
.then();
61+
return Mono.empty();
62+
});
6463
}
6564
// end::stream-2[]
6665

7/part1/comments/src/main/java/com/greglturnquist/learningspringboot/comments/CommentService.java

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,10 @@
2121
import org.springframework.boot.CommandLineRunner;
2222
import org.springframework.boot.actuate.metrics.CounterService;
2323
import org.springframework.cloud.stream.annotation.EnableBinding;
24-
import org.springframework.cloud.stream.messaging.Sink;
24+
import org.springframework.cloud.stream.annotation.Input;
25+
import org.springframework.cloud.stream.annotation.Output;
26+
import org.springframework.cloud.stream.annotation.StreamListener;
27+
import org.springframework.cloud.stream.messaging.Processor;
2528
import org.springframework.context.annotation.Bean;
2629
import org.springframework.stereotype.Service;
2730

@@ -30,7 +33,7 @@
3033
*/
3134
// tag::stream-1[]
3235
@Service
33-
@EnableBinding(Sink.class)
36+
@EnableBinding(Processor.class)
3437
public class CommentService {
3538
// end::stream-1[]
3639

@@ -45,17 +48,18 @@ public CommentService(CommentRepository repository,
4548
}
4649

4750
// tag::stream-2[]
48-
public Mono<Void> save(Flux<Comment> newComment) {
51+
@StreamListener
52+
@Output(Processor.OUTPUT)
53+
public Flux<Void> save(@Input(Processor.INPUT) Flux<Comment> newComment) {
4954
return repository
5055
.saveAll(newComment)
51-
.map(comment -> {
56+
.flatMap(comment -> {
5257
counterService.increment(
5358
"comments.total.consumed");
5459
counterService.increment(
5560
"comments." + comment.getImageId() + ".consumed");
56-
return comment;
57-
})
58-
.then();
61+
return Mono.empty();
62+
});
5963
}
6064
// end::stream-2[]
6165

7/part1/images/src/main/java/com/greglturnquist/learningspringboot/CommentSimulator.java

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
import java.util.concurrent.atomic.AtomicInteger;
1919

20+
import reactor.core.publisher.Mono;
21+
2022
import org.springframework.context.annotation.Profile;
2123
import org.springframework.scheduling.annotation.Scheduled;
2224
import org.springframework.stereotype.Component;
@@ -47,13 +49,16 @@ public CommentSimulator(CommentController controller,
4749

4850
@Scheduled(fixedRate = 100)
4951
public void simulateActivity() {
50-
repository.findAll().map(image -> {
51-
Comment comment = new Comment();
52-
comment.setImageId(image.getId());
53-
comment.setComment(
54-
"Comment #" + counter.getAndIncrement());
55-
return controller.addComment(comment);
56-
})
52+
repository
53+
.findAll()
54+
.map(image -> {
55+
Comment comment = new Comment();
56+
comment.setImageId(image.getId());
57+
comment.setComment(
58+
"Comment #" + counter.getAndIncrement());
59+
return Mono.just(comment);
60+
})
61+
.map(controller::addComment)
5762
.subscribe();
5863
}
5964
}

0 commit comments

Comments
 (0)