Skip to content

Commit ca48796

Browse files
authored
Merge pull request #91 from reactive-commons/fix/circular-beans-dependency
fix circular dependency
2 parents 95b8658 + 2dbe46b commit ca48796

File tree

17 files changed

+165
-80
lines changed

17 files changed

+165
-80
lines changed

async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/rabbit/config/CommandListenersConfig.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,10 @@ public class CommandListenersConfig {
2424

2525
@Bean
2626
public ApplicationCommandListener applicationCommandListener(ConnectionManager manager,
27+
DomainHandlers handlers,
2728
MessageConverter converter,
2829
CustomReporter errorReporter) {
29-
ApplicationCommandListener commandListener = new ApplicationCommandListener(manager.getListener(DEFAULT_DOMAIN), appName, manager.getHandlerResolver(DEFAULT_DOMAIN),
30+
ApplicationCommandListener commandListener = new ApplicationCommandListener(manager.getListener(DEFAULT_DOMAIN), appName, handlers.get(DEFAULT_DOMAIN),
3031
asyncProps.getDirect().getExchange(), converter, asyncProps.getWithDLQRetry(), asyncProps.getMaxRetries(),
3132
asyncProps.getRetryDelay(), asyncProps.getDirect().getMaxLengthBytes(), manager.getDiscardNotifier(DEFAULT_DOMAIN), errorReporter);
3233

async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/rabbit/config/ConnectionManager.java

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
import lombok.Getter;
55
import lombok.Setter;
66
import org.reactivecommons.async.commons.DiscardNotifier;
7-
import org.reactivecommons.async.rabbit.HandlerResolver;
87
import org.reactivecommons.async.rabbit.communications.ReactiveMessageListener;
98
import org.reactivecommons.async.rabbit.communications.ReactiveMessageSender;
109

@@ -20,7 +19,6 @@ public class ConnectionManager {
2019
public static class DomainConnections {
2120
private final ReactiveMessageListener listener;
2221
private final ReactiveMessageSender sender;
23-
private final HandlerResolver handlerResolver;
2422
private final ConnectionFactoryProvider provider;
2523
@Setter
2624
private DiscardNotifier discardNotifier;
@@ -35,35 +33,39 @@ public void forListener(BiConsumer<String, ReactiveMessageListener> consumer) {
3533
}
3634

3735
public void setDiscardNotifier(String domain, DiscardNotifier discardNotifier) {
38-
connections.get(domain).setDiscardNotifier(discardNotifier);
36+
getChecked(domain).setDiscardNotifier(discardNotifier);
3937
}
4038

41-
public ConnectionManager addDomain(String domain, ReactiveMessageListener listener, ReactiveMessageSender sender,
42-
HandlerResolver resolver) {
39+
public ConnectionManager addDomain(String domain, ReactiveMessageListener listener, ReactiveMessageSender sender) {
4340
connections.put(domain, DomainConnections.builder()
4441
.listener(listener)
4542
.sender(sender)
46-
.handlerResolver(resolver)
4743
.build());
4844
return this;
4945
}
5046

5147
public ReactiveMessageSender getSender(String domain) {
52-
return connections.get(domain).getSender();
48+
return getChecked(domain).getSender();
5349
}
5450

5551
public ReactiveMessageListener getListener(String domain) {
56-
return connections.get(domain).getListener();
52+
return getChecked(domain).getListener();
5753
}
5854

59-
public DiscardNotifier getDiscardNotifier(String domain) {
60-
return connections.get(domain).getDiscardNotifier();
55+
private DomainConnections getChecked(String domain) {
56+
DomainConnections domainConnections = connections.get(domain);
57+
if (domainConnections == null) {
58+
throw new RuntimeException("You are trying to use the domain " + domain
59+
+ " but this connection is not defined");
60+
}
61+
return domainConnections;
6162
}
6263

63-
public HandlerResolver getHandlerResolver(String domain) {
64-
return connections.get(domain).getHandlerResolver();
64+
public DiscardNotifier getDiscardNotifier(String domain) {
65+
return getChecked(domain).getDiscardNotifier();
6566
}
67+
6668
public ConnectionFactoryProvider getProvider(String domain) {
67-
return connections.get(domain).getProvider();
69+
return getChecked(domain).getProvider();
6870
}
6971
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
package org.reactivecommons.async.rabbit.config;
2+
3+
import org.reactivecommons.async.rabbit.HandlerResolver;
4+
5+
import java.util.Map;
6+
import java.util.TreeMap;
7+
8+
public class DomainHandlers {
9+
private final Map<String, HandlerResolver> handlers = new TreeMap<>();
10+
11+
public void add(String domain, HandlerResolver resolver) {
12+
this.handlers.put(domain, resolver);
13+
}
14+
15+
public HandlerResolver get(String domain) {
16+
HandlerResolver handlerResolver = handlers.get(domain);
17+
if (handlerResolver == null) {
18+
throw new RuntimeException("You are trying to use the domain " + domain
19+
+ " but this connection is not defined");
20+
}
21+
return handlerResolver;
22+
}
23+
}

async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/rabbit/config/EventListenersConfig.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,13 @@ public class EventListenersConfig {
2626

2727
@Bean
2828
public ApplicationEventListener eventListener(MessageConverter messageConverter,
29-
ConnectionManager manager, CustomReporter errorReporter) {
29+
ConnectionManager manager, DomainHandlers handlers,
30+
CustomReporter errorReporter) {
3031
AtomicReference<ApplicationEventListener> external = new AtomicReference<>();
3132
manager.forListener((domain, receiver) -> {
3233
final ApplicationEventListener listener = new ApplicationEventListener(receiver,
3334
appName + ".subsEvents",
34-
manager.getHandlerResolver(domain),
35+
handlers.get(domain),
3536
asyncProps.getDomain().getEvents().getExchange(),
3637
messageConverter, asyncProps.getWithDLQRetry(),
3738
asyncProps.getMaxRetries(),

async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/rabbit/config/HandlerResolverBuilder.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import lombok.AccessLevel;
44
import lombok.NoArgsConstructor;
5+
import lombok.extern.log4j.Log4j2;
56
import org.reactivecommons.async.api.DefaultCommandHandler;
67
import org.reactivecommons.async.api.HandlerRegistry;
78
import org.reactivecommons.async.api.handlers.registered.RegisteredCommandHandler;
@@ -17,6 +18,7 @@
1718
import static org.reactivecommons.async.api.HandlerRegistry.DEFAULT_DOMAIN;
1819

1920
@NoArgsConstructor(access = AccessLevel.PRIVATE)
21+
@Log4j2
2022
public class HandlerResolverBuilder {
2123

2224
public static HandlerResolver buildResolver(String domain,
@@ -75,7 +77,13 @@ private static ConcurrentMap<String, RegisteredEventListener<?>> getEventHandler
7577
// event handlers and dynamic handlers
7678
return registries
7779
.values().stream()
78-
.flatMap(r -> Stream.concat(r.getDomainEventListeners().get(domain).stream(), getDynamics(domain, r)))
80+
.flatMap(r -> {
81+
if (r.getDomainEventListeners().containsKey(domain)) {
82+
return Stream.concat(r.getDomainEventListeners().get(domain).stream(), getDynamics(domain, r));
83+
}
84+
log.warn("Domain " + domain + "does not have a connection defined in your configuration and you want to listen from it");
85+
return Stream.empty();
86+
})
7987
.collect(ConcurrentHashMap::new, (map, handler) -> map.put(handler.getPath(), handler),
8088
ConcurrentHashMap::putAll);
8189
}
@@ -90,7 +98,13 @@ private static Stream<RegisteredEventListener<?>> getDynamics(String domain, Han
9098
private static ConcurrentMap<String, RegisteredEventListener<?>> getEventsToBind(String domain, Map<String, HandlerRegistry> registries) {
9199
return registries
92100
.values().stream()
93-
.flatMap(r -> r.getDomainEventListeners().get(domain).stream())
101+
.flatMap(r -> {
102+
if (r.getDomainEventListeners().containsKey(domain)) {
103+
return r.getDomainEventListeners().get(domain).stream();
104+
}
105+
log.warn("Domain " + domain + "does not have a connection defined in your configuration and you want to listen from it");
106+
return Stream.empty();
107+
})
94108
.collect(ConcurrentHashMap::new, (map, handler) -> map.put(handler.getPath(), handler),
95109
ConcurrentHashMap::putAll);
96110
}

async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/rabbit/config/NotificacionListenersConfig.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,13 +24,14 @@ public class NotificacionListenersConfig {
2424

2525
@Bean
2626
public ApplicationNotificationListener eventNotificationListener(ConnectionManager manager,
27+
DomainHandlers handlers,
2728
MessageConverter messageConverter,
2829
CustomReporter errorReporter) {
2930
final ApplicationNotificationListener listener = new ApplicationNotificationListener(
3031
manager.getListener(DEFAULT_DOMAIN),
3132
asyncProps.getDomain().getEvents().getExchange(),
3233
asyncProps.getNotificationProps().getQueueName(appName),
33-
manager.getHandlerResolver(DEFAULT_DOMAIN),
34+
handlers.get(DEFAULT_DOMAIN),
3435
messageConverter,
3536
manager.getDiscardNotifier(DEFAULT_DOMAIN),
3637
errorReporter);

async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/rabbit/config/QueryListenerConfig.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,11 @@ public class QueryListenerConfig {
2424

2525
@Bean
2626
public ApplicationQueryListener queryListener(MessageConverter converter,
27+
DomainHandlers handlers,
2728
ConnectionManager manager,
2829
CustomReporter errorReporter) {
2930
final ApplicationQueryListener listener = new ApplicationQueryListener(manager.getListener(DEFAULT_DOMAIN),
30-
appName + ".query", manager.getHandlerResolver(DEFAULT_DOMAIN), manager.getSender(DEFAULT_DOMAIN), asyncProps.getDirect().getExchange(), converter,
31+
appName + ".query", handlers.get(DEFAULT_DOMAIN), manager.getSender(DEFAULT_DOMAIN), asyncProps.getDirect().getExchange(), converter,
3132
asyncProps.getGlobal().getExchange(), asyncProps.getWithDLQRetry(), asyncProps.getMaxRetries(),
3233
asyncProps.getRetryDelay(), asyncProps.getGlobal().getMaxLengthBytes(),
3334
asyncProps.getDirect().isDiscardTimeoutQueries(), manager.getDiscardNotifier(DEFAULT_DOMAIN), errorReporter);

async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/rabbit/config/RabbitMqConfig.java

Lines changed: 21 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -60,25 +60,34 @@ public class RabbitMqConfig {
6060
public ConnectionManager buildConnectionManager(@Value("${spring.application.name}") String appName,
6161
AsyncProps props,
6262
RabbitProperties defaultAppProps,
63-
MessageConverter converter,
64-
ApplicationContext context,
65-
HandlerRegistry primaryRegistry,
66-
DefaultCommandHandler<?> commandHandler) {
63+
MessageConverter converter) {
6764
ConnectionManager connectionManager = new ConnectionManager();
65+
props.getConnections().computeIfAbsent(DEFAULT_DOMAIN, k -> defaultAppProps);
66+
props.getConnections()
67+
.forEach((domain, properties) -> {
68+
ConnectionFactoryProvider provider = createConnectionFactoryProvider(properties);
69+
ReactiveMessageSender sender = createMessageSender(appName, provider, properties, converter);
70+
ReactiveMessageListener listener = createMessageListener(appName, provider, props);
71+
connectionManager.addDomain(domain, listener, sender);
72+
});
73+
return connectionManager;
74+
}
75+
76+
@Bean
77+
public DomainHandlers buildHandlers(AsyncProps props, RabbitProperties defaultAppProps, ApplicationContext context,
78+
HandlerRegistry primaryRegistry, DefaultCommandHandler<?> commandHandler) {
79+
DomainHandlers handlers = new DomainHandlers();
6880
final Map<String, HandlerRegistry> registries = context.getBeansOfType(HandlerRegistry.class);
6981
if (!registries.containsValue(primaryRegistry)) {
7082
registries.put("primaryHandlerRegistry", primaryRegistry);
7183
}
7284
props.getConnections().computeIfAbsent(DEFAULT_DOMAIN, k -> defaultAppProps);
7385
props.getConnections()
7486
.forEach((domain, properties) -> {
75-
ConnectionFactoryProvider provider = createConnectionFactoryProvider(properties);
76-
ReactiveMessageSender sender = createMessageSender(appName, provider, properties, converter);
77-
ReactiveMessageListener listener = createMessageListener(appName, provider, props);
7887
HandlerResolver resolver = HandlerResolverBuilder.buildResolver(domain, registries, commandHandler);
79-
connectionManager.addDomain(domain, listener, sender, resolver);
88+
handlers.add(domain, resolver);
8089
});
81-
return connectionManager;
90+
return handlers;
8291
}
8392

8493

@@ -187,8 +196,9 @@ Mono<Connection> createConnectionMono(ConnectionFactory factory, String connecti
187196
}
188197

189198
@Bean
190-
public DynamicRegistry dynamicRegistry(ConnectionManager connectionManager, IBrokerConfigProps props) {
191-
return new DynamicRegistryImp(connectionManager.getHandlerResolver(DEFAULT_DOMAIN),
199+
public DynamicRegistry dynamicRegistry(ConnectionManager connectionManager, DomainHandlers handlers,
200+
IBrokerConfigProps props) {
201+
return new DynamicRegistryImp(handlers.get(DEFAULT_DOMAIN),
192202
connectionManager.getListener(DEFAULT_DOMAIN).getTopologyCreator(), props);
193203
}
194204

async/async-rabbit-starter-eda/src/test/java/org/reactivecommons/async/rabbit/config/CommandListenersConfigTest.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ class CommandListenersConfigTest {
4040
private final CustomReporter customReporter = mock(CustomReporter.class);
4141
private final Receiver receiver = mock(Receiver.class);
4242
private final ConnectionManager manager = new ConnectionManager();
43+
private final DomainHandlers handlers = new DomainHandlers();
4344

4445
@BeforeEach
4546
public void init() throws NoSuchFieldException, IllegalAccessException {
@@ -53,12 +54,13 @@ public void init() throws NoSuchFieldException, IllegalAccessException {
5354
when(receiver.consumeManualAck(any(String.class), any(ConsumeOptions.class))).thenReturn(Flux.never());
5455
when(listener.getReceiver()).thenReturn(receiver);
5556
when(listener.getMaxConcurrency()).thenReturn(20);
56-
manager.addDomain(DEFAULT_DOMAIN, listener, null, handlerResolver);
57+
manager.addDomain(DEFAULT_DOMAIN, listener, null);
58+
handlers.add(DEFAULT_DOMAIN, handlerResolver);
5759
}
5860

5961
@Test
6062
void applicationCommandListener() {
61-
final ApplicationCommandListener commandListener = config.applicationCommandListener(manager, messageConverter, customReporter);
63+
final ApplicationCommandListener commandListener = config.applicationCommandListener(manager, handlers, messageConverter, customReporter);
6264
Assertions.assertThat(commandListener).isNotNull();
6365
}
6466
}

async/async-rabbit-starter-eda/src/test/java/org/reactivecommons/async/rabbit/config/EventListenersConfigTest.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ class EventListenersConfigTest {
3939
private final CustomReporter customReporter = mock(CustomReporter.class);
4040
private final Receiver receiver = mock(Receiver.class);
4141
private ConnectionManager connectionManager;
42+
private final DomainHandlers handlers = new DomainHandlers();
4243

4344
@BeforeEach
4445
public void init() {
@@ -54,14 +55,16 @@ public void init() {
5455
when(listener.getReceiver()).thenReturn(receiver);
5556
when(listener.getMaxConcurrency()).thenReturn(20);
5657
connectionManager = new ConnectionManager();
57-
connectionManager.addDomain(HandlerRegistry.DEFAULT_DOMAIN, listener, sender, handlerResolver);
58+
connectionManager.addDomain(HandlerRegistry.DEFAULT_DOMAIN, listener, sender);
59+
handlers.add(HandlerRegistry.DEFAULT_DOMAIN, handlerResolver);
5860
}
5961

6062
@Test
6163
void eventListener() {
6264
final ApplicationEventListener eventListener = config.eventListener(
6365
messageConverter,
6466
connectionManager,
67+
handlers,
6568
customReporter
6669
);
6770

0 commit comments

Comments
 (0)