Skip to content

Commit e4e743d

Browse files
authored
Merge pull request #97 from reactive-commons/feature/eda-update
feat(eda): Add eda properties, allow different queue and exchange names disallow topology creation
2 parents 8a90375 + 920fbc9 commit e4e743d

File tree

110 files changed

+1508
-899
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

110 files changed

+1508
-899
lines changed

async/async-commons-api/async-commons-api.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,5 +7,5 @@ dependencies {
77
api project(':domain-events-api')
88
compileOnly 'io.projectreactor:reactor-core'
99
testImplementation 'io.projectreactor:reactor-test'
10-
implementation 'io.cloudevents:cloudevents-json-jackson:2.5.0'
10+
implementation "io.cloudevents:cloudevents-json-jackson:${cloudEventsVersion}"
1111
}

async/async-commons/src/main/java/org/reactivecommons/async/commons/config/IBrokerConfigProps.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,17 +3,19 @@
33
public interface IBrokerConfigProps {
44
String getEventsQueue();
55

6+
String getNotificationsQueue();
7+
68
String getQueriesQueue();
79

810
String getCommandsQueue();
911

1012
String getReplyQueue();
1113

12-
String getAppName();
13-
1414
String getDomainEventsExchangeName();
1515

1616
String getDirectMessagesExchangeName();
1717

18-
java.util.concurrent.atomic.AtomicReference<String> getReplyQueueName();
18+
String getGlobalReplyExchangeName();
19+
20+
String getAppName();
1921
}

async/async-commons/src/main/java/org/reactivecommons/async/commons/utils/NameGenerator.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,19 @@
66

77
public class NameGenerator {
88

9+
public static String fromNameWithSuffix(String appName, String suffix) {
10+
if (suffix != null && !suffix.isEmpty()) {
11+
return appName + "." + suffix;
12+
}
13+
return appName;
14+
}
15+
916
public static String generateNameFrom(String applicationName, String suffix) {
10-
return generateName(applicationName,suffix);
17+
return generateName(applicationName, suffix);
1118
}
1219

1320
public static String generateNameFrom(String applicationName) {
14-
return generateName(applicationName,"");
21+
return generateName(applicationName, "");
1522
}
1623

1724
private static String generateName(String applicationName, String suffix) {
@@ -21,7 +28,7 @@ private static String generateName(String applicationName, String suffix) {
2128
.putLong(uuid.getLeastSignificantBits());
2229
// Convert to base64 and remove trailing =
2330
String realSuffix = suffix != null && !"".equals(suffix) ? suffix + "." : "";
24-
return applicationName+"."+ realSuffix + encodeToUrlSafeString(bb.array())
31+
return applicationName + "." + realSuffix + encodeToUrlSafeString(bb.array())
2532
.replace("=", "");
2633
}
2734

async/async-commons/src/test/java/org/reactivecommons/async/commons/converters/json/DefaultObjectMapperSupplierTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,12 @@ class DefaultObjectMapperSupplierTest {
1818
void shouldMapWithUnknownProperties() throws IOException {
1919
ObjectMapper objectMapper = defaultObjectMapperSupplier.get();
2020

21-
SampleClassExtra base = new SampleClassExtra("23", "one", new Date(), 45l);
21+
SampleClassExtra base = new SampleClassExtra("23", "one", new Date(), 45L);
2222
final String serialized = objectMapper.writeValueAsString(base);
2323

2424
final SampleClass result = objectMapper.readValue(serialized, SampleClass.class);
2525

26-
assertThat(result).isEqualToComparingFieldByField(base);
26+
assertThat(result).usingRecursiveComparison().isEqualTo(base);
2727
}
2828

2929
@Getter

async/async-rabbit-standalone/src/main/java/org/reactivecommons/async/rabbit/config/DirectAsyncGatewayConfig.java

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,35 +1,34 @@
11
package org.reactivecommons.async.rabbit.config;
22

33
import io.micrometer.core.instrument.MeterRegistry;
4+
import lombok.AllArgsConstructor;
5+
import org.reactivecommons.async.commons.config.BrokerConfig;
6+
import org.reactivecommons.async.commons.converters.MessageConverter;
7+
import org.reactivecommons.async.commons.reply.ReactiveReplyRouter;
48
import org.reactivecommons.async.rabbit.RabbitDirectAsyncGateway;
59
import org.reactivecommons.async.rabbit.communications.ReactiveMessageListener;
610
import org.reactivecommons.async.rabbit.communications.ReactiveMessageSender;
7-
import org.reactivecommons.async.commons.converters.MessageConverter;
811
import org.reactivecommons.async.rabbit.listeners.ApplicationReplyListener;
9-
import org.reactivecommons.async.commons.reply.ReactiveReplyRouter;
10-
import org.reactivecommons.async.commons.config.BrokerConfig;
1112

1213
import java.nio.ByteBuffer;
1314
import java.util.Base64;
1415
import java.util.UUID;
1516

17+
@AllArgsConstructor
1618
public class DirectAsyncGatewayConfig {
1719

1820
private String directMessagesExchangeName;
21+
private String globalReplyExchangeName;
1922
private String appName;
2023

21-
public DirectAsyncGatewayConfig(String directMessagesExchangeName, String appName) {
22-
this.directMessagesExchangeName = directMessagesExchangeName;
23-
this.appName = appName;
24-
}
2524

2625
public RabbitDirectAsyncGateway rabbitDirectAsyncGateway(BrokerConfig config, ReactiveReplyRouter router, ReactiveMessageSender rSender, MessageConverter converter,
2726
MeterRegistry meterRegistry) throws Exception {
2827
return new RabbitDirectAsyncGateway(config, router, rSender, directMessagesExchangeName, converter, meterRegistry);
2928
}
3029

31-
public ApplicationReplyListener msgListener(ReactiveReplyRouter router, BrokerConfig config, ReactiveMessageListener listener) {
32-
final ApplicationReplyListener replyListener = new ApplicationReplyListener(router, listener, generateName());
30+
public ApplicationReplyListener msgListener(ReactiveReplyRouter router, BrokerConfig config, ReactiveMessageListener listener, boolean createTopology) {
31+
final ApplicationReplyListener replyListener = new ApplicationReplyListener(router, listener, generateName(), globalReplyExchangeName, createTopology);
3332
replyListener.startListening(config.getRoutingKey());
3433
return replyListener;
3534
}
@@ -48,10 +47,10 @@ public String generateName() {
4847
UUID uuid = UUID.randomUUID();
4948
ByteBuffer bb = ByteBuffer.wrap(new byte[16]);
5049
bb.putLong(uuid.getMostSignificantBits())
51-
.putLong(uuid.getLeastSignificantBits());
50+
.putLong(uuid.getLeastSignificantBits());
5251
// Convert to base64 and remove trailing =
5352
return this.appName + encodeToUrlSafeString(bb.array())
54-
.replaceAll("=", "");
53+
.replaceAll("=", "");
5554
}
5655

5756
public static String encodeToUrlSafeString(byte[] src) {

async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/impl/config/annotations/EnableMessageListeners.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
import org.reactivecommons.async.rabbit.config.CommandListenersConfig;
44
import org.reactivecommons.async.rabbit.config.EventListenersConfig;
5-
import org.reactivecommons.async.rabbit.config.NotificacionListenersConfig;
5+
import org.reactivecommons.async.rabbit.config.NotificationListenersConfig;
66
import org.reactivecommons.async.rabbit.config.QueryListenerConfig;
77
import org.springframework.context.annotation.Configuration;
88
import org.springframework.context.annotation.Import;
@@ -17,7 +17,7 @@
1717
@Retention(RetentionPolicy.RUNTIME)
1818
@Target({ElementType.TYPE})
1919
@Documented
20-
@Import({CommandListenersConfig.class, QueryListenerConfig.class, EventListenersConfig.class, NotificacionListenersConfig.class})
20+
@Import({CommandListenersConfig.class, QueryListenerConfig.class, EventListenersConfig.class, NotificationListenersConfig.class})
2121
@Configuration
2222
public @interface EnableMessageListeners {
2323
}

async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/impl/config/annotations/EnableNotificationListener.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
package org.reactivecommons.async.impl.config.annotations;
22

3-
import org.reactivecommons.async.rabbit.config.NotificacionListenersConfig;
3+
import org.reactivecommons.async.rabbit.config.NotificationListenersConfig;
44
import org.springframework.context.annotation.Configuration;
55
import org.springframework.context.annotation.Import;
66

@@ -10,7 +10,7 @@
1010
@Retention(RetentionPolicy.RUNTIME)
1111
@Target({ElementType.TYPE})
1212
@Documented
13-
@Import(NotificacionListenersConfig.class)
13+
@Import(NotificationListenersConfig.class)
1414
@Configuration
1515
public @interface EnableNotificationListener {
1616
}

async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/rabbit/config/CommandListenersConfig.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@
44
import org.reactivecommons.async.commons.converters.MessageConverter;
55
import org.reactivecommons.async.commons.ext.CustomReporter;
66
import org.reactivecommons.async.rabbit.config.props.AsyncProps;
7+
import org.reactivecommons.async.rabbit.config.props.AsyncPropsDomain;
78
import org.reactivecommons.async.rabbit.listeners.ApplicationCommandListener;
8-
import org.springframework.beans.factory.annotation.Value;
99
import org.springframework.context.annotation.Bean;
1010
import org.springframework.context.annotation.Configuration;
1111
import org.springframework.context.annotation.Import;
@@ -16,20 +16,20 @@
1616
@RequiredArgsConstructor
1717
@Import(RabbitMqConfig.class)
1818
public class CommandListenersConfig {
19-
20-
@Value("${spring.application.name}")
21-
private String appName;
22-
23-
private final AsyncProps asyncProps;
19+
private final AsyncPropsDomain asyncPropsDomain;
2420

2521
@Bean
2622
public ApplicationCommandListener applicationCommandListener(ConnectionManager manager,
2723
DomainHandlers handlers,
2824
MessageConverter converter,
2925
CustomReporter errorReporter) {
30-
ApplicationCommandListener commandListener = new ApplicationCommandListener(manager.getListener(DEFAULT_DOMAIN), appName, handlers.get(DEFAULT_DOMAIN),
31-
asyncProps.getDirect().getExchange(), converter, asyncProps.getWithDLQRetry(), asyncProps.getDelayedCommands(), asyncProps.getMaxRetries(),
32-
asyncProps.getRetryDelay(), asyncProps.getDirect().getMaxLengthBytes(), manager.getDiscardNotifier(DEFAULT_DOMAIN), errorReporter);
26+
AsyncProps asyncProps = asyncPropsDomain.getProps(DEFAULT_DOMAIN);
27+
ApplicationCommandListener commandListener = new ApplicationCommandListener(manager.getListener(DEFAULT_DOMAIN),
28+
asyncProps.getBrokerConfigProps().getCommandsQueue(), handlers.get(DEFAULT_DOMAIN),
29+
asyncProps.getDirect().getExchange(), converter, asyncProps.getWithDLQRetry(),
30+
asyncProps.getCreateTopology(), asyncProps.getDelayedCommands(), asyncProps.getMaxRetries(),
31+
asyncProps.getRetryDelay(), asyncProps.getDirect().getMaxLengthBytes(),
32+
manager.getDiscardNotifier(DEFAULT_DOMAIN), errorReporter);
3333

3434
commandListener.startListener();
3535

async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/rabbit/config/DirectAsyncGatewayConfig.java

Lines changed: 32 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -3,47 +3,66 @@
33
import io.micrometer.core.instrument.MeterRegistry;
44
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
55
import lombok.RequiredArgsConstructor;
6+
import lombok.extern.java.Log;
67
import org.reactivecommons.async.RabbitEDADirectAsyncGateway;
78
import org.reactivecommons.async.commons.config.BrokerConfig;
89
import org.reactivecommons.async.commons.converters.MessageConverter;
910
import org.reactivecommons.async.commons.reply.ReactiveReplyRouter;
1011
import org.reactivecommons.async.rabbit.RabbitDirectAsyncGateway;
12+
import org.reactivecommons.async.rabbit.communications.ReactiveMessageSender;
1113
import org.reactivecommons.async.rabbit.config.props.AsyncProps;
12-
import org.reactivecommons.async.rabbit.config.props.BrokerConfigProps;
14+
import org.reactivecommons.async.rabbit.config.props.AsyncPropsDomain;
1315
import org.reactivecommons.async.rabbit.listeners.ApplicationReplyListener;
1416
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
1517
import org.springframework.context.annotation.Bean;
1618
import org.springframework.context.annotation.Configuration;
1719
import org.springframework.context.annotation.Import;
1820

1921
import java.util.concurrent.atomic.AtomicReference;
22+
import java.util.logging.Level;
2023

2124
import static org.reactivecommons.async.api.HandlerRegistry.DEFAULT_DOMAIN;
25+
import static reactor.rabbitmq.ExchangeSpecification.exchange;
2226

27+
@Log
2328
@Configuration
2429
@Import(RabbitMqConfig.class)
2530
@RequiredArgsConstructor
2631
public class DirectAsyncGatewayConfig {
2732

28-
private final BrokerConfigProps props;
29-
3033
@Bean
31-
public RabbitDirectAsyncGateway rabbitDirectAsyncGateway(BrokerConfig config, ReactiveReplyRouter router, ConnectionManager manager, MessageConverter converter, MeterRegistry meterRegistry) throws Exception {
32-
return new RabbitEDADirectAsyncGateway(config, router, manager, props.getDirectMessagesExchangeName(), converter, meterRegistry);
34+
public RabbitDirectAsyncGateway rabbitDirectAsyncGateway(BrokerConfig config, ReactiveReplyRouter router,
35+
ConnectionManager manager, MessageConverter converter,
36+
MeterRegistry meterRegistry,
37+
AsyncPropsDomain asyncPropsDomain) {
38+
ReactiveMessageSender sender = manager.getSender(DEFAULT_DOMAIN);
39+
AsyncProps asyncProps = asyncPropsDomain.getProps(DEFAULT_DOMAIN);
40+
String exchangeName = asyncProps.getBrokerConfigProps().getDirectMessagesExchangeName();
41+
if (asyncProps.getCreateTopology()) {
42+
sender.getTopologyCreator().declare(exchange(exchangeName).durable(true).type("direct")).subscribe();
43+
}
44+
return new RabbitEDADirectAsyncGateway(config, router, manager, exchangeName, converter, meterRegistry);
3345
}
3446

3547
@Bean
36-
public ApplicationReplyListener msgListener(ReactiveReplyRouter router, AsyncProps asyncProps, BrokerConfig config, ConnectionManager manager) {
37-
asyncProps.getListenRepliesFrom().add(DEFAULT_DOMAIN);
48+
public ApplicationReplyListener msgListener(ReactiveReplyRouter router, AsyncPropsDomain asyncProps,
49+
BrokerConfig config, ConnectionManager manager) {
3850
AtomicReference<ApplicationReplyListener> localListener = new AtomicReference<>();
3951

40-
asyncProps.getListenRepliesFrom().forEach(domain -> {
41-
42-
final ApplicationReplyListener replyListener = new ApplicationReplyListener(router, manager.getListener(domain), props.getReplyQueue());
43-
replyListener.startListening(config.getRoutingKey());
52+
asyncProps.forEach((domain, props) -> {
53+
if (props.isListenReplies()) {
54+
final ApplicationReplyListener replyListener =
55+
new ApplicationReplyListener(router, manager.getListener(domain),
56+
props.getBrokerConfigProps().getReplyQueue(),
57+
props.getBrokerConfigProps().getGlobalReplyExchangeName(), props.getCreateTopology());
58+
replyListener.startListening(config.getRoutingKey());
4459

45-
if (DEFAULT_DOMAIN.equals(domain)) {
46-
localListener.set(replyListener);
60+
if (DEFAULT_DOMAIN.equals(domain)) {
61+
localListener.set(replyListener);
62+
}
63+
} else {
64+
log.log(Level.WARNING,"ApplicationReplyListener is disabled in AsyncProps or app.async." + domain
65+
+ ".listenReplies for domain " + domain);
4766
}
4867
});
4968

async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/rabbit/config/EventBusConfig.java

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,8 @@
77
import org.reactivecommons.async.rabbit.RabbitDiscardNotifier;
88
import org.reactivecommons.async.rabbit.RabbitDomainEventBus;
99
import org.reactivecommons.async.rabbit.communications.ReactiveMessageSender;
10-
import org.reactivecommons.async.rabbit.config.props.BrokerConfigProps;
10+
import org.reactivecommons.async.rabbit.config.props.AsyncProps;
11+
import org.reactivecommons.async.rabbit.config.props.AsyncPropsDomain;
1112
import org.springframework.context.annotation.Bean;
1213
import org.springframework.context.annotation.Configuration;
1314
import org.springframework.context.annotation.Import;
@@ -20,11 +21,14 @@
2021
public class EventBusConfig {
2122

2223
@Bean // app connection
23-
public DomainEventBus domainEventBus(ConnectionManager manager, BrokerConfigProps props, BrokerConfig config,
24-
ObjectMapperSupplier objectMapperSupplier) {
24+
public DomainEventBus domainEventBus(ConnectionManager manager, BrokerConfig config,
25+
AsyncPropsDomain asyncPropsDomain, ObjectMapperSupplier objectMapperSupplier) {
2526
ReactiveMessageSender sender = manager.getSender(DEFAULT_DOMAIN);
26-
final String exchangeName = props.getDomainEventsExchangeName();
27-
sender.getTopologyCreator().declare(exchange(exchangeName).durable(true).type("topic")).subscribe();
27+
AsyncProps asyncProps = asyncPropsDomain.getProps(DEFAULT_DOMAIN);
28+
final String exchangeName = asyncProps.getBrokerConfigProps().getDomainEventsExchangeName();
29+
if (asyncProps.getCreateTopology()) {
30+
sender.getTopologyCreator().declare(exchange(exchangeName).durable(true).type("topic")).subscribe();
31+
}
2832
DomainEventBus domainEventBus = new RabbitDomainEventBus(sender, exchangeName, config);
2933
manager.setDiscardNotifier(DEFAULT_DOMAIN, createDiscardNotifier(domainEventBus, objectMapperSupplier));
3034
return domainEventBus;

async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/rabbit/config/EventListenersConfig.java

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@
44
import org.reactivecommons.async.commons.converters.MessageConverter;
55
import org.reactivecommons.async.commons.ext.CustomReporter;
66
import org.reactivecommons.async.rabbit.config.props.AsyncProps;
7+
import org.reactivecommons.async.rabbit.config.props.AsyncPropsDomain;
78
import org.reactivecommons.async.rabbit.listeners.ApplicationEventListener;
8-
import org.springframework.beans.factory.annotation.Value;
99
import org.springframework.context.annotation.Bean;
1010
import org.springframework.context.annotation.Configuration;
1111
import org.springframework.context.annotation.Import;
@@ -19,28 +19,27 @@
1919
@Import(RabbitMqConfig.class)
2020
public class EventListenersConfig {
2121

22-
@Value("${spring.application.name}")
23-
private String appName;
24-
25-
private final AsyncProps asyncProps;
22+
private final AsyncPropsDomain asyncPropsDomain;
2623

2724
@Bean
2825
public ApplicationEventListener eventListener(MessageConverter messageConverter,
2926
ConnectionManager manager, DomainHandlers handlers,
3027
CustomReporter errorReporter) {
3128
AtomicReference<ApplicationEventListener> external = new AtomicReference<>();
3229
manager.forListener((domain, receiver) -> {
30+
AsyncProps asyncProps = asyncPropsDomain.getProps(domain);
3331
final ApplicationEventListener listener = new ApplicationEventListener(receiver,
34-
appName + ".subsEvents",
32+
asyncProps.getBrokerConfigProps().getEventsQueue(),
33+
asyncProps.getBrokerConfigProps().getDomainEventsExchangeName(),
3534
handlers.get(domain),
36-
asyncProps.getDomain().getEvents().getExchange(),
3735
messageConverter, asyncProps.getWithDLQRetry(),
36+
asyncProps.getCreateTopology(),
3837
asyncProps.getMaxRetries(),
3938
asyncProps.getRetryDelay(),
4039
asyncProps.getDomain().getEvents().getMaxLengthBytes(),
4140
manager.getDiscardNotifier(domain),
4241
errorReporter,
43-
appName);
42+
asyncProps.getAppName());
4443
if (DEFAULT_DOMAIN.equals(domain)) {
4544
external.set(listener);
4645
}

0 commit comments

Comments
 (0)