Skip to content

feat(eda): Add eda properties, allow different queue and exchange names disallow topology creation #97

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion async/async-commons-api/async-commons-api.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,5 @@ dependencies {
api project(':domain-events-api')
compileOnly 'io.projectreactor:reactor-core'
testImplementation 'io.projectreactor:reactor-test'
implementation 'io.cloudevents:cloudevents-json-jackson:2.5.0'
implementation "io.cloudevents:cloudevents-json-jackson:${cloudEventsVersion}"
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,19 @@
public interface IBrokerConfigProps {
String getEventsQueue();

String getNotificationsQueue();

String getQueriesQueue();

String getCommandsQueue();

String getReplyQueue();

String getAppName();

String getDomainEventsExchangeName();

String getDirectMessagesExchangeName();

java.util.concurrent.atomic.AtomicReference<String> getReplyQueueName();
String getGlobalReplyExchangeName();

String getAppName();
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,19 @@

public class NameGenerator {

public static String fromNameWithSuffix(String appName, String suffix) {
if (suffix != null && !suffix.isEmpty()) {
return appName + "." + suffix;
}
return appName;
}

public static String generateNameFrom(String applicationName, String suffix) {
return generateName(applicationName,suffix);
return generateName(applicationName, suffix);
}

public static String generateNameFrom(String applicationName) {
return generateName(applicationName,"");
return generateName(applicationName, "");
}

private static String generateName(String applicationName, String suffix) {
Expand All @@ -21,7 +28,7 @@ private static String generateName(String applicationName, String suffix) {
.putLong(uuid.getLeastSignificantBits());
// Convert to base64 and remove trailing =
String realSuffix = suffix != null && !"".equals(suffix) ? suffix + "." : "";
return applicationName+"."+ realSuffix + encodeToUrlSafeString(bb.array())
return applicationName + "." + realSuffix + encodeToUrlSafeString(bb.array())
.replace("=", "");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@ class DefaultObjectMapperSupplierTest {
void shouldMapWithUnknownProperties() throws IOException {
ObjectMapper objectMapper = defaultObjectMapperSupplier.get();

SampleClassExtra base = new SampleClassExtra("23", "one", new Date(), 45l);
SampleClassExtra base = new SampleClassExtra("23", "one", new Date(), 45L);
final String serialized = objectMapper.writeValueAsString(base);

final SampleClass result = objectMapper.readValue(serialized, SampleClass.class);

assertThat(result).isEqualToComparingFieldByField(base);
assertThat(result).usingRecursiveComparison().isEqualTo(base);
}

@Getter
Expand Down
Original file line number Diff line number Diff line change
@@ -1,35 +1,34 @@
package org.reactivecommons.async.rabbit.config;

import io.micrometer.core.instrument.MeterRegistry;
import lombok.AllArgsConstructor;
import org.reactivecommons.async.commons.config.BrokerConfig;
import org.reactivecommons.async.commons.converters.MessageConverter;
import org.reactivecommons.async.commons.reply.ReactiveReplyRouter;
import org.reactivecommons.async.rabbit.RabbitDirectAsyncGateway;
import org.reactivecommons.async.rabbit.communications.ReactiveMessageListener;
import org.reactivecommons.async.rabbit.communications.ReactiveMessageSender;
import org.reactivecommons.async.commons.converters.MessageConverter;
import org.reactivecommons.async.rabbit.listeners.ApplicationReplyListener;
import org.reactivecommons.async.commons.reply.ReactiveReplyRouter;
import org.reactivecommons.async.commons.config.BrokerConfig;

import java.nio.ByteBuffer;
import java.util.Base64;
import java.util.UUID;

@AllArgsConstructor
public class DirectAsyncGatewayConfig {

private String directMessagesExchangeName;
private String globalReplyExchangeName;
private String appName;

public DirectAsyncGatewayConfig(String directMessagesExchangeName, String appName) {
this.directMessagesExchangeName = directMessagesExchangeName;
this.appName = appName;
}

public RabbitDirectAsyncGateway rabbitDirectAsyncGateway(BrokerConfig config, ReactiveReplyRouter router, ReactiveMessageSender rSender, MessageConverter converter,
MeterRegistry meterRegistry) throws Exception {
return new RabbitDirectAsyncGateway(config, router, rSender, directMessagesExchangeName, converter, meterRegistry);
}

public ApplicationReplyListener msgListener(ReactiveReplyRouter router, BrokerConfig config, ReactiveMessageListener listener) {
final ApplicationReplyListener replyListener = new ApplicationReplyListener(router, listener, generateName());
public ApplicationReplyListener msgListener(ReactiveReplyRouter router, BrokerConfig config, ReactiveMessageListener listener, boolean createTopology) {
final ApplicationReplyListener replyListener = new ApplicationReplyListener(router, listener, generateName(), globalReplyExchangeName, createTopology);
replyListener.startListening(config.getRoutingKey());
return replyListener;
}
Expand All @@ -48,10 +47,10 @@ public String generateName() {
UUID uuid = UUID.randomUUID();
ByteBuffer bb = ByteBuffer.wrap(new byte[16]);
bb.putLong(uuid.getMostSignificantBits())
.putLong(uuid.getLeastSignificantBits());
.putLong(uuid.getLeastSignificantBits());
// Convert to base64 and remove trailing =
return this.appName + encodeToUrlSafeString(bb.array())
.replaceAll("=", "");
.replaceAll("=", "");
}

public static String encodeToUrlSafeString(byte[] src) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import org.reactivecommons.async.rabbit.config.CommandListenersConfig;
import org.reactivecommons.async.rabbit.config.EventListenersConfig;
import org.reactivecommons.async.rabbit.config.NotificacionListenersConfig;
import org.reactivecommons.async.rabbit.config.NotificationListenersConfig;
import org.reactivecommons.async.rabbit.config.QueryListenerConfig;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
Expand All @@ -17,7 +17,7 @@
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE})
@Documented
@Import({CommandListenersConfig.class, QueryListenerConfig.class, EventListenersConfig.class, NotificacionListenersConfig.class})
@Import({CommandListenersConfig.class, QueryListenerConfig.class, EventListenersConfig.class, NotificationListenersConfig.class})
@Configuration
public @interface EnableMessageListeners {
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package org.reactivecommons.async.impl.config.annotations;

import org.reactivecommons.async.rabbit.config.NotificacionListenersConfig;
import org.reactivecommons.async.rabbit.config.NotificationListenersConfig;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;

Expand All @@ -10,7 +10,7 @@
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE})
@Documented
@Import(NotificacionListenersConfig.class)
@Import(NotificationListenersConfig.class)
@Configuration
public @interface EnableNotificationListener {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
import org.reactivecommons.async.commons.converters.MessageConverter;
import org.reactivecommons.async.commons.ext.CustomReporter;
import org.reactivecommons.async.rabbit.config.props.AsyncProps;
import org.reactivecommons.async.rabbit.config.props.AsyncPropsDomain;
import org.reactivecommons.async.rabbit.listeners.ApplicationCommandListener;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
Expand All @@ -16,20 +16,20 @@
@RequiredArgsConstructor
@Import(RabbitMqConfig.class)
public class CommandListenersConfig {

@Value("${spring.application.name}")
private String appName;

private final AsyncProps asyncProps;
private final AsyncPropsDomain asyncPropsDomain;

@Bean
public ApplicationCommandListener applicationCommandListener(ConnectionManager manager,
DomainHandlers handlers,
MessageConverter converter,
CustomReporter errorReporter) {
ApplicationCommandListener commandListener = new ApplicationCommandListener(manager.getListener(DEFAULT_DOMAIN), appName, handlers.get(DEFAULT_DOMAIN),
asyncProps.getDirect().getExchange(), converter, asyncProps.getWithDLQRetry(), asyncProps.getDelayedCommands(), asyncProps.getMaxRetries(),
asyncProps.getRetryDelay(), asyncProps.getDirect().getMaxLengthBytes(), manager.getDiscardNotifier(DEFAULT_DOMAIN), errorReporter);
AsyncProps asyncProps = asyncPropsDomain.getProps(DEFAULT_DOMAIN);
ApplicationCommandListener commandListener = new ApplicationCommandListener(manager.getListener(DEFAULT_DOMAIN),
asyncProps.getBrokerConfigProps().getCommandsQueue(), handlers.get(DEFAULT_DOMAIN),
asyncProps.getDirect().getExchange(), converter, asyncProps.getWithDLQRetry(),
asyncProps.getCreateTopology(), asyncProps.getDelayedCommands(), asyncProps.getMaxRetries(),
asyncProps.getRetryDelay(), asyncProps.getDirect().getMaxLengthBytes(),
manager.getDiscardNotifier(DEFAULT_DOMAIN), errorReporter);

commandListener.startListener();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,47 +3,66 @@
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import lombok.RequiredArgsConstructor;
import lombok.extern.java.Log;
import org.reactivecommons.async.RabbitEDADirectAsyncGateway;
import org.reactivecommons.async.commons.config.BrokerConfig;
import org.reactivecommons.async.commons.converters.MessageConverter;
import org.reactivecommons.async.commons.reply.ReactiveReplyRouter;
import org.reactivecommons.async.rabbit.RabbitDirectAsyncGateway;
import org.reactivecommons.async.rabbit.communications.ReactiveMessageSender;
import org.reactivecommons.async.rabbit.config.props.AsyncProps;
import org.reactivecommons.async.rabbit.config.props.BrokerConfigProps;
import org.reactivecommons.async.rabbit.config.props.AsyncPropsDomain;
import org.reactivecommons.async.rabbit.listeners.ApplicationReplyListener;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;

import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Level;

import static org.reactivecommons.async.api.HandlerRegistry.DEFAULT_DOMAIN;
import static reactor.rabbitmq.ExchangeSpecification.exchange;

@Log
@Configuration
@Import(RabbitMqConfig.class)
@RequiredArgsConstructor
public class DirectAsyncGatewayConfig {

private final BrokerConfigProps props;

@Bean
public RabbitDirectAsyncGateway rabbitDirectAsyncGateway(BrokerConfig config, ReactiveReplyRouter router, ConnectionManager manager, MessageConverter converter, MeterRegistry meterRegistry) throws Exception {
return new RabbitEDADirectAsyncGateway(config, router, manager, props.getDirectMessagesExchangeName(), converter, meterRegistry);
public RabbitDirectAsyncGateway rabbitDirectAsyncGateway(BrokerConfig config, ReactiveReplyRouter router,
ConnectionManager manager, MessageConverter converter,
MeterRegistry meterRegistry,
AsyncPropsDomain asyncPropsDomain) {
ReactiveMessageSender sender = manager.getSender(DEFAULT_DOMAIN);
AsyncProps asyncProps = asyncPropsDomain.getProps(DEFAULT_DOMAIN);
String exchangeName = asyncProps.getBrokerConfigProps().getDirectMessagesExchangeName();
if (asyncProps.getCreateTopology()) {
sender.getTopologyCreator().declare(exchange(exchangeName).durable(true).type("direct")).subscribe();
}
return new RabbitEDADirectAsyncGateway(config, router, manager, exchangeName, converter, meterRegistry);
}

@Bean
public ApplicationReplyListener msgListener(ReactiveReplyRouter router, AsyncProps asyncProps, BrokerConfig config, ConnectionManager manager) {
asyncProps.getListenRepliesFrom().add(DEFAULT_DOMAIN);
public ApplicationReplyListener msgListener(ReactiveReplyRouter router, AsyncPropsDomain asyncProps,
BrokerConfig config, ConnectionManager manager) {
AtomicReference<ApplicationReplyListener> localListener = new AtomicReference<>();

asyncProps.getListenRepliesFrom().forEach(domain -> {

final ApplicationReplyListener replyListener = new ApplicationReplyListener(router, manager.getListener(domain), props.getReplyQueue());
replyListener.startListening(config.getRoutingKey());
asyncProps.forEach((domain, props) -> {
if (props.isListenReplies()) {
final ApplicationReplyListener replyListener =
new ApplicationReplyListener(router, manager.getListener(domain),
props.getBrokerConfigProps().getReplyQueue(),
props.getBrokerConfigProps().getGlobalReplyExchangeName(), props.getCreateTopology());
replyListener.startListening(config.getRoutingKey());

if (DEFAULT_DOMAIN.equals(domain)) {
localListener.set(replyListener);
if (DEFAULT_DOMAIN.equals(domain)) {
localListener.set(replyListener);
}
} else {
log.log(Level.WARNING,"ApplicationReplyListener is disabled in AsyncProps or app.async." + domain
+ ".listenReplies for domain " + domain);
}
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
import org.reactivecommons.async.rabbit.RabbitDiscardNotifier;
import org.reactivecommons.async.rabbit.RabbitDomainEventBus;
import org.reactivecommons.async.rabbit.communications.ReactiveMessageSender;
import org.reactivecommons.async.rabbit.config.props.BrokerConfigProps;
import org.reactivecommons.async.rabbit.config.props.AsyncProps;
import org.reactivecommons.async.rabbit.config.props.AsyncPropsDomain;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
Expand All @@ -20,11 +21,14 @@
public class EventBusConfig {

@Bean // app connection
public DomainEventBus domainEventBus(ConnectionManager manager, BrokerConfigProps props, BrokerConfig config,
ObjectMapperSupplier objectMapperSupplier) {
public DomainEventBus domainEventBus(ConnectionManager manager, BrokerConfig config,
AsyncPropsDomain asyncPropsDomain, ObjectMapperSupplier objectMapperSupplier) {
ReactiveMessageSender sender = manager.getSender(DEFAULT_DOMAIN);
final String exchangeName = props.getDomainEventsExchangeName();
sender.getTopologyCreator().declare(exchange(exchangeName).durable(true).type("topic")).subscribe();
AsyncProps asyncProps = asyncPropsDomain.getProps(DEFAULT_DOMAIN);
final String exchangeName = asyncProps.getBrokerConfigProps().getDomainEventsExchangeName();
if (asyncProps.getCreateTopology()) {
sender.getTopologyCreator().declare(exchange(exchangeName).durable(true).type("topic")).subscribe();
}
DomainEventBus domainEventBus = new RabbitDomainEventBus(sender, exchangeName, config);
manager.setDiscardNotifier(DEFAULT_DOMAIN, createDiscardNotifier(domainEventBus, objectMapperSupplier));
return domainEventBus;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
import org.reactivecommons.async.commons.converters.MessageConverter;
import org.reactivecommons.async.commons.ext.CustomReporter;
import org.reactivecommons.async.rabbit.config.props.AsyncProps;
import org.reactivecommons.async.rabbit.config.props.AsyncPropsDomain;
import org.reactivecommons.async.rabbit.listeners.ApplicationEventListener;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
Expand All @@ -19,28 +19,27 @@
@Import(RabbitMqConfig.class)
public class EventListenersConfig {

@Value("${spring.application.name}")
private String appName;

private final AsyncProps asyncProps;
private final AsyncPropsDomain asyncPropsDomain;

@Bean
public ApplicationEventListener eventListener(MessageConverter messageConverter,
ConnectionManager manager, DomainHandlers handlers,
CustomReporter errorReporter) {
AtomicReference<ApplicationEventListener> external = new AtomicReference<>();
manager.forListener((domain, receiver) -> {
AsyncProps asyncProps = asyncPropsDomain.getProps(domain);
final ApplicationEventListener listener = new ApplicationEventListener(receiver,
appName + ".subsEvents",
asyncProps.getBrokerConfigProps().getEventsQueue(),
asyncProps.getBrokerConfigProps().getDomainEventsExchangeName(),
handlers.get(domain),
asyncProps.getDomain().getEvents().getExchange(),
messageConverter, asyncProps.getWithDLQRetry(),
asyncProps.getCreateTopology(),
asyncProps.getMaxRetries(),
asyncProps.getRetryDelay(),
asyncProps.getDomain().getEvents().getMaxLengthBytes(),
manager.getDiscardNotifier(domain),
errorReporter,
appName);
asyncProps.getAppName());
if (DEFAULT_DOMAIN.equals(domain)) {
external.set(listener);
}
Expand Down
Loading