Skip to content

Commit b862461

Browse files
authored
Merge pull request #92 from reactive-commons/feature/delayed-command
Add delayed command capability
2 parents 1cb8290 + 43bd7af commit b862461

File tree

15 files changed

+110
-49
lines changed

15 files changed

+110
-49
lines changed

async/async-commons-api/src/main/java/org/reactivecommons/async/api/DirectAsyncGateway.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,16 @@
55
import reactor.core.publisher.Mono;
66

77
public interface DirectAsyncGateway {
8+
public static final String DELAYED = "rc-delay";
9+
810
<T> Mono<Void> sendCommand(Command<T> command, String targetName);
911

12+
<T> Mono<Void> sendCommand(Command<T> command, String targetName, long delayMillis);
13+
1014
<T> Mono<Void> sendCommand(Command<T> command, String targetName, String domain);
1115

16+
<T> Mono<Void> sendCommand(Command<T> command, String targetName, long delayMillis, String domain);
17+
1218
Mono<Void> sendCommand(CloudEvent command, String targetName);
1319

1420
Mono<Void> sendCommand(CloudEvent command, String targetName, String domain);

async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/rabbit/config/CommandListenersConfig.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ public ApplicationCommandListener applicationCommandListener(ConnectionManager m
2828
MessageConverter converter,
2929
CustomReporter errorReporter) {
3030
ApplicationCommandListener commandListener = new ApplicationCommandListener(manager.getListener(DEFAULT_DOMAIN), appName, handlers.get(DEFAULT_DOMAIN),
31-
asyncProps.getDirect().getExchange(), converter, asyncProps.getWithDLQRetry(), asyncProps.getMaxRetries(),
31+
asyncProps.getDirect().getExchange(), converter, asyncProps.getWithDLQRetry(), asyncProps.getDelayedCommands(), asyncProps.getMaxRetries(),
3232
asyncProps.getRetryDelay(), asyncProps.getDirect().getMaxLengthBytes(), manager.getDiscardNotifier(DEFAULT_DOMAIN), errorReporter);
3333

3434
commandListener.startListener();

async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/rabbit/config/props/AsyncProps.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,5 +44,6 @@ public class AsyncProps {
4444
private Integer retryDelay = 1000;
4545

4646
private Boolean withDLQRetry = false;
47+
private Boolean delayedCommands = false;
4748

4849
}

async/async-rabbit-starter/src/main/java/org/reactivecommons/async/rabbit/config/CommandListenersConfig.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ public ApplicationCommandListener applicationCommandListener(ReactiveMessageList
2929
DiscardNotifier discardNotifier,
3030
CustomReporter errorReporter) {
3131
ApplicationCommandListener commandListener = new ApplicationCommandListener(listener, appName, resolver,
32-
asyncProps.getDirect().getExchange(), converter, asyncProps.getWithDLQRetry(), asyncProps.getMaxRetries(),
32+
asyncProps.getDirect().getExchange(), converter, asyncProps.getWithDLQRetry(), asyncProps.getDelayedCommands(), asyncProps.getMaxRetries(),
3333
asyncProps.getRetryDelay(), asyncProps.getDirect().getMaxLengthBytes(), discardNotifier, errorReporter);
3434

3535
commandListener.startListener();

async/async-rabbit-starter/src/main/java/org/reactivecommons/async/rabbit/config/props/AsyncProps.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,4 +33,5 @@ public class AsyncProps {
3333
private Integer retryDelay = 1000;
3434

3535
private Boolean withDLQRetry = false;
36+
private Boolean delayedCommands = false;
3637
}

async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/RabbitDirectAsyncGateway.java

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,12 +59,28 @@ public RabbitDirectAsyncGateway(BrokerConfig config, ReactiveReplyRouter router,
5959

6060
@Override
6161
public <T> Mono<Void> sendCommand(Command<T> command, String targetName) {
62-
return sendCommand(command, targetName, DEFAULT_DOMAIN);
62+
return sendCommand(command, targetName, 0, DEFAULT_DOMAIN);
63+
}
64+
65+
@Override
66+
public <T> Mono<Void> sendCommand(Command<T> command, String targetName, long delayMillis) {
67+
return sendCommand(command, targetName, delayMillis, DEFAULT_DOMAIN);
6368
}
6469

6570
@Override
6671
public <T> Mono<Void> sendCommand(Command<T> command, String targetName, String domain) {
67-
return resolveSender(domain).sendWithConfirm(command, exchange, targetName, Collections.emptyMap(), persistentCommands);
72+
return sendCommand(command, targetName, 0, domain);
73+
}
74+
75+
@Override
76+
public <T> Mono<Void> sendCommand(Command<T> command, String targetName, long delayMillis, String domain) {
77+
Map<String, Object> headers = new HashMap<>();
78+
String realTarget = targetName;
79+
if (delayMillis > 0) {
80+
headers.put(DELAYED, String.valueOf(delayMillis));
81+
realTarget = targetName + "-delayed";
82+
}
83+
return resolveSender(domain).sendWithConfirm(command, exchange, realTarget, headers, persistentCommands);
6884
}
6985

7086
@Override

async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/communications/ReactiveMessageSender.java

Lines changed: 22 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,13 @@
11
package org.reactivecommons.async.rabbit.communications;
22

33
import com.rabbitmq.client.AMQP;
4+
import org.reactivecommons.async.commons.communications.Message;
45
import org.reactivecommons.async.commons.converters.MessageConverter;
56
import org.reactivecommons.async.commons.exceptions.SendFailureNoAckException;
6-
import org.reactivecommons.async.commons.communications.Message;
7-
import reactor.core.publisher.*;
7+
import reactor.core.publisher.Flux;
8+
import reactor.core.publisher.FluxSink;
9+
import reactor.core.publisher.Mono;
10+
import reactor.core.publisher.MonoSink;
811
import reactor.rabbitmq.OutboundMessage;
912
import reactor.rabbitmq.OutboundMessageResult;
1013
import reactor.rabbitmq.Sender;
@@ -13,14 +16,16 @@
1316
import java.util.HashMap;
1417
import java.util.Map;
1518
import java.util.UUID;
16-
import java.util.concurrent.*;
19+
import java.util.concurrent.CopyOnWriteArrayList;
20+
import java.util.concurrent.ExecutorService;
21+
import java.util.concurrent.Executors;
1722
import java.util.concurrent.atomic.AtomicLong;
1823
import java.util.function.Consumer;
1924

25+
import static org.reactivecommons.async.api.DirectAsyncGateway.DELAYED;
2026
import static org.reactivecommons.async.commons.Headers.SOURCE_APPLICATION;
2127

2228
public class ReactiveMessageSender {
23-
2429
private final Sender sender;
2530
private final String sourceApplication;
2631
private final MessageConverter messageConverter;
@@ -61,7 +66,7 @@ public <T> Mono<Void> sendWithConfirm(T message, String exchange, String routing
6166
return Mono.create(monoSink -> {
6267
Consumer<Boolean> notifier = new AckNotifier(monoSink);
6368
final MyOutboundMessage outboundMessage = toOutboundMessage(message, exchange, routingKey, headers, notifier, persistent);
64-
executorService2.submit(() -> fluxSinkConfirm.get((int) (System.currentTimeMillis()%numberOfSenderSubscriptions)).next(outboundMessage));
69+
executorService2.submit(() -> fluxSinkConfirm.get((int) (System.currentTimeMillis() % numberOfSenderSubscriptions)).next(outboundMessage));
6570
});
6671
}
6772

@@ -73,11 +78,11 @@ public <T> Mono<Void> sendNoConfirm(T message, String exchange, String routingKe
7378

7479
public <T> Flux<OutboundMessageResult> sendWithConfirmBatch(Flux<T> messages, String exchange, String routingKey, Map<String, Object> headers, boolean persistent) {
7580
return messages.map(message -> toOutboundMessage(message, exchange, routingKey, headers, persistent))
76-
.as(sender::sendWithPublishConfirms)
77-
.flatMap(result -> result.isAck() ?
78-
Mono.empty() :
79-
Mono.error(new SendFailureNoAckException("Event no ACK in communications"))
80-
);
81+
.as(sender::sendWithPublishConfirms)
82+
.flatMap(result -> result.isAck() ?
83+
Mono.empty() :
84+
Mono.error(new SendFailureNoAckException("Event no ACK in communications"))
85+
);
8186
}
8287

8388
private static class AckNotifier implements Consumer<Boolean> {
@@ -98,8 +103,7 @@ public void accept(Boolean ack) {
98103
}
99104

100105

101-
102-
static class MyOutboundMessage extends OutboundMessage{
106+
static class MyOutboundMessage extends OutboundMessage {
103107

104108
private final Consumer<Boolean> ackNotifier;
105109

@@ -130,14 +134,18 @@ private AMQP.BasicProperties buildMessageProperties(Message message, Map<String,
130134
final Map<String, Object> baseHeaders = new HashMap<>(properties.getHeaders());
131135
baseHeaders.putAll(headers);
132136
baseHeaders.put(SOURCE_APPLICATION, sourceApplication);
133-
return new AMQP.BasicProperties.Builder()
137+
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder()
134138
.contentType(properties.getContentType())
135139
.appId(sourceApplication)
136140
.contentEncoding(properties.getContentEncoding())
137141
.deliveryMode(persistent ? 2 : 1)
138142
.timestamp(new Date())
139143
.messageId(UUID.randomUUID().toString())
140-
.headers(baseHeaders).build();
144+
.headers(baseHeaders);
145+
if (headers.containsKey(DELAYED)) {
146+
builder.expiration((String) headers.get(DELAYED));
147+
}
148+
return builder.build();
141149
}
142150

143151
public reactor.rabbitmq.Sender getSender() {

async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/communications/TopologyCreator.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,9 +56,15 @@ public Mono<AMQP.Queue.DeclareOk> declareDLQ(String originQueue, String retryTar
5656
}
5757

5858
public Mono<AMQP.Queue.DeclareOk> declareQueue(String name, String dlqExchange, Optional<Integer> maxLengthBytesOpt) {
59+
return declareQueue(name, dlqExchange, maxLengthBytesOpt, Optional.empty());
60+
}
61+
62+
public Mono<AMQP.Queue.DeclareOk> declareQueue(String name, String dlqExchange, Optional<Integer> maxLengthBytesOpt,
63+
Optional<String> dlRoutingKey) {
5964
final Map<String, Object> args = new HashMap<>();
6065
args.put("x-dead-letter-exchange", dlqExchange);
6166
maxLengthBytesOpt.ifPresent(maxLengthBytes -> args.put("x-max-length-bytes", maxLengthBytes));
67+
dlRoutingKey.ifPresent(routingKey -> args.put("x-dead-letter-routing-key", routingKey));
6268
QueueSpecification specification = QueueSpecification.queue(name)
6369
.durable(true)
6470
.arguments(args);

async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/listeners/ApplicationCommandListener.java

Lines changed: 27 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,21 +3,20 @@
33
import com.rabbitmq.client.AMQP;
44
import lombok.extern.java.Log;
55
import org.reactivecommons.api.domain.Command;
6-
import org.reactivecommons.async.commons.communications.Message;
7-
import org.reactivecommons.async.commons.converters.MessageConverter;
8-
import org.reactivecommons.async.commons.DiscardNotifier;
96
import org.reactivecommons.async.api.handlers.registered.RegisteredCommandHandler;
107
import org.reactivecommons.async.commons.CommandExecutor;
8+
import org.reactivecommons.async.commons.DiscardNotifier;
9+
import org.reactivecommons.async.commons.communications.Message;
10+
import org.reactivecommons.async.commons.converters.MessageConverter;
11+
import org.reactivecommons.async.commons.ext.CustomReporter;
1112
import org.reactivecommons.async.rabbit.HandlerResolver;
1213
import org.reactivecommons.async.rabbit.RabbitMessage;
1314
import org.reactivecommons.async.rabbit.communications.ReactiveMessageListener;
1415
import org.reactivecommons.async.rabbit.communications.TopologyCreator;
15-
import org.reactivecommons.async.commons.ext.CustomReporter;
1616
import reactor.core.publisher.Mono;
1717
import reactor.rabbitmq.AcknowledgableDelivery;
1818
import reactor.rabbitmq.BindingSpecification;
1919
import reactor.rabbitmq.ExchangeSpecification;
20-
import reactor.rabbitmq.QueueSpecification;
2120

2221
import java.util.Optional;
2322
import java.util.function.Function;
@@ -29,18 +28,20 @@ public class ApplicationCommandListener extends GenericMessageListener {
2928
private final HandlerResolver resolver;
3029
private final String directExchange;
3130
private final boolean withDLQRetry;
31+
private final boolean delayedCommands;
3232
private final int retryDelay;
3333
private final Optional<Integer> maxLengthBytes;
3434

3535
//TODO: change large constructor parameters number
36-
public ApplicationCommandListener(ReactiveMessageListener listener, String queueName, HandlerResolver resolver, String directExchange, MessageConverter messageConverter, boolean withDLQRetry, long maxRetries, int retryDelay, Optional<Integer> maxLengthBytes, DiscardNotifier discardNotifier, CustomReporter errorReporter) {
36+
public ApplicationCommandListener(ReactiveMessageListener listener, String queueName, HandlerResolver resolver, String directExchange, MessageConverter messageConverter, boolean withDLQRetry, boolean delayedCommands, long maxRetries, int retryDelay, Optional<Integer> maxLengthBytes, DiscardNotifier discardNotifier, CustomReporter errorReporter) {
3737
super(queueName, listener, withDLQRetry, maxRetries, discardNotifier, "command", errorReporter);
3838
this.retryDelay = retryDelay;
3939
this.withDLQRetry = withDLQRetry;
40+
this.delayedCommands = delayedCommands;
4041
this.resolver = resolver;
4142
this.directExchange = directExchange;
4243
this.messageConverter = messageConverter;
43-
this.maxLengthBytes =maxLengthBytes;
44+
this.maxLengthBytes = maxLengthBytes;
4445
}
4546

4647
protected Mono<Void> setUpBindings(TopologyCreator creator) {
@@ -51,12 +52,28 @@ protected Mono<Void> setUpBindings(TopologyCreator creator) {
5152
final Mono<AMQP.Queue.DeclareOk> declareDLQ = creator.declareDLQ(queueName, directExchange, retryDelay, maxLengthBytes);
5253
final Mono<AMQP.Queue.BindOk> binding = creator.bind(BindingSpecification.binding(directExchange, queueName, queueName));
5354
final Mono<AMQP.Queue.BindOk> bindingDLQ = creator.bind(BindingSpecification.binding(directExchange + ".DLQ", queueName, queueName + ".DLQ"));
54-
return declareExchange.then(declareExchangeDLQ).then(declareDLQ).then(declareQueue).then(bindingDLQ).then(binding).then();
55+
return declareExchange.then(declareExchangeDLQ)
56+
.then(declareDLQ)
57+
.then(declareQueue)
58+
.then(bindingDLQ)
59+
.then(binding)
60+
.then(declareDelayedTopology(creator))
61+
.then();
5562
} else {
5663
final Mono<AMQP.Queue.DeclareOk> declareQueue = creator.declareQueue(queueName, maxLengthBytes);
5764
final Mono<AMQP.Queue.BindOk> binding = creator.bind(BindingSpecification.binding(directExchange, queueName, queueName));
58-
return declareExchange.then(declareQueue).then(binding).then();
65+
return declareExchange.then(declareQueue).then(binding).then(declareDelayedTopology(creator)).then();
66+
}
67+
}
68+
69+
private Mono<Void> declareDelayedTopology(TopologyCreator creator) {
70+
if (delayedCommands) {
71+
String delayedQueue = queueName + "-delayed";
72+
final Mono<AMQP.Queue.DeclareOk> declareQueue = creator.declareQueue(delayedQueue, directExchange, maxLengthBytes, Optional.of(queueName));
73+
final Mono<AMQP.Queue.BindOk> binding = creator.bind(BindingSpecification.binding(directExchange, delayedQueue, delayedQueue));
74+
return declareQueue.then(binding).then();
5975
}
76+
return Mono.empty();
6077
}
6178

6279

@@ -75,7 +92,7 @@ protected String getExecutorPath(AcknowledgableDelivery msj) {
7592
}
7693

7794
@Override
78-
protected Object parseMessageForReporter(Message message){
95+
protected Object parseMessageForReporter(Message message) {
7996
return messageConverter.readCommandStructure(message);
8097
}
8198

async/async-rabbit/src/test/java/org/reactivecommons/async/rabbit/listeners/ApplicationCommandListenerPerfTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -322,7 +322,7 @@ private AMQP.BasicProperties createProps() {
322322
class StubGenericMessageListener extends ApplicationCommandListener {
323323

324324
public StubGenericMessageListener(String queueName, ReactiveMessageListener listener, boolean useDLQRetries, long maxRetries, DiscardNotifier discardNotifier, String objectType, HandlerResolver handlerResolver, MessageConverter messageConverter, CustomReporter errorReporter) {
325-
super(listener, queueName, handlerResolver, "directExchange", messageConverter, true, 10, 10, Optional.empty(), discardNotifier, errorReporter);
325+
super(listener, queueName, handlerResolver, "directExchange", messageConverter, true, false, 10, 10, Optional.empty(), discardNotifier, errorReporter);
326326
}
327327

328328
}

async/async-rabbit/src/test/java/org/reactivecommons/async/rabbit/listeners/ApplicationCommandListenerTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ protected GenericMessageListener createMessageListener(HandlerResolver handlerRe
5454
class StubGenericMessageListener extends ApplicationCommandListener {
5555

5656
public StubGenericMessageListener(HandlerResolver handlerResolver) {
57-
super(reactiveMessageListener, "queueName", handlerResolver, "directExchange", messageConverter, true, 10, 10, Optional.empty(), discardNotifier, errorReporter);
57+
super(reactiveMessageListener, "queueName", handlerResolver, "directExchange", messageConverter, true, false,10, 10, Optional.empty(), discardNotifier, errorReporter);
5858
}
5959
}
6060
}

gradle.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
version=2.0.3
1+
version=2.0.4
22
springBootVersion=3.0.2
33
reactorRabbitVersion=1.5.5
44
gradleVersionsVersion=0.36.0

samples/async/receiver-responder/src/main/java/sample/SampleReceiverApp.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -71,14 +71,14 @@ public HandlerRegistry handlerRegistrySubs(/*DirectAsyncGateway gateway*/UseCase
7171
// }, Call.class);
7272

7373
// .handleDynamicEvents("dynamic.*", message -> Mono.empty(), Object.class)
74-
.listenEvent("event", message -> {
75-
log.info(message.getData().toString());
76-
return useCase.sendCommand(message.getData());
77-
}, CloudEvent.class)
78-
.handleCommand("command", message -> {
79-
log.info(message.getData().toString());
74+
// .listenEvent("event", message -> {
75+
// log.info(message.getData().toString());
76+
// return useCase.sendCommand(message.getData());
77+
// }, CloudEvent.class)
78+
.handleCommand("unlock", message -> {
79+
log.info(message.getData());
8080
return Mono.empty();
81-
}, CloudEvent.class)
81+
}, String.class)
8282
// .serveQuery("query1", message -> {
8383
// log.info("resolving from direct query" + message);
8484
// Map<String, String> mapData = Map.of("1", "data");

samples/async/receiver-responder/src/main/resources/application.yaml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,8 @@ spring:
66
app:
77
async:
88
max-retries: 10
9-
with-d-l-q-retry: true
9+
# with-d-l-q-retry: true
10+
delayed-commands: true
1011
retry-delay: 1000 # son milisegundos
1112
# connections:
1213
# app:

0 commit comments

Comments
 (0)