Skip to content

Commit aa5ef29

Browse files
committed
feat: use configuration properties
1 parent a4d4e1e commit aa5ef29

File tree

11 files changed

+161
-52
lines changed

11 files changed

+161
-52
lines changed

async/async-commons-starter/async-commons-starter.gradle

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,5 +75,8 @@ bintray {
7575
dependencies {
7676
compile project(":async-commons")
7777
api('org.springframework.boot:spring-boot-starter')
78+
79+
annotationProcessor 'org.springframework.boot:spring-boot-configuration-processor'
80+
7881
testImplementation 'io.projectreactor:reactor-test'
7982
}
Lines changed: 44 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package org.reactivecommons.async.impl.config;
22

3+
import lombok.RequiredArgsConstructor;
34
import org.reactivecommons.async.api.DefaultCommandHandler;
45
import org.reactivecommons.async.api.DefaultQueryHandler;
56
import org.reactivecommons.async.api.DynamicRegistry;
@@ -12,6 +13,7 @@
1213
import org.reactivecommons.async.impl.HandlerResolver;
1314
import org.reactivecommons.async.impl.communications.ReactiveMessageListener;
1415
import org.reactivecommons.async.impl.communications.ReactiveMessageSender;
16+
import org.reactivecommons.async.impl.config.props.AsyncProps;
1517
import org.reactivecommons.async.impl.converters.MessageConverter;
1618
import org.reactivecommons.async.impl.listeners.ApplicationCommandListener;
1719
import org.reactivecommons.async.impl.listeners.ApplicationEventListener;
@@ -22,53 +24,60 @@
2224
import org.springframework.context.annotation.Bean;
2325
import org.springframework.context.annotation.Configuration;
2426
import org.springframework.context.annotation.Import;
25-
import org.springframework.core.env.Environment;
2627
import reactor.core.publisher.Mono;
2728

2829
import java.util.Map;
2930
import java.util.concurrent.ConcurrentHashMap;
3031
import java.util.concurrent.ConcurrentMap;
3132

3233
@Configuration
34+
@RequiredArgsConstructor
3335
@Import(RabbitMqConfig.class)
3436
public class MessageListenersConfig {
3537

36-
@Value("${app.async.domain.events.exchange:domainEvents}")
37-
private String domainEventsExchangeName;
38-
3938
@Value("${spring.application.name}")
4039
private String appName;
4140

42-
@Value("${app.async.direct.exchange:directMessages}")
43-
private String directMessagesExchangeName;
44-
45-
@Value("${app.async.maxRetries:10}")
46-
private long maxRetries;
47-
48-
@Value("${app.async.retryDelay:1000}")
49-
private int retryDelay;
41+
private final AsyncProps asyncProps;
5042

51-
@Value("${app.async.withDLQRetry:false}")
52-
private boolean withDLQRetry;
5343

5444
@Bean //TODO: move to own config (QueryListenerConfig)
55-
public ApplicationEventListener eventListener(HandlerResolver resolver, MessageConverter messageConverter, ReactiveMessageListener receiver, DiscardNotifier discardNotifier) throws Exception {
56-
final ApplicationEventListener listener = new ApplicationEventListener(receiver, appName + ".subsEvents", resolver, domainEventsExchangeName, messageConverter, withDLQRetry, maxRetries, retryDelay, discardNotifier);
45+
public ApplicationEventListener eventListener(HandlerResolver resolver, MessageConverter messageConverter,
46+
ReactiveMessageListener receiver, DiscardNotifier discardNotifier) {
47+
final ApplicationEventListener listener = new ApplicationEventListener(receiver,
48+
appName + ".subsEvents", resolver, asyncProps.getDomain().getEvents().getExchange(),
49+
messageConverter, asyncProps.getWithDLQRetry(), asyncProps.getMaxRetries(), asyncProps.getRetryDelay(),
50+
discardNotifier);
51+
5752
listener.startListener();
53+
5854
return listener;
5955
}
6056

6157
@Bean //TODO: move to own config (QueryListenerConfig)
62-
public ApplicationQueryListener queryListener(MessageConverter converter, HandlerResolver resolver, ReactiveMessageSender sender, ReactiveMessageListener rlistener, DiscardNotifier discardNotifier) throws Exception {
63-
final ApplicationQueryListener listener = new ApplicationQueryListener(rlistener, appName+".query", resolver, sender, directMessagesExchangeName, converter, "globalReply", withDLQRetry, maxRetries, retryDelay, discardNotifier);
58+
public ApplicationQueryListener queryListener(MessageConverter converter, HandlerResolver resolver,
59+
ReactiveMessageSender sender, ReactiveMessageListener rlistener,
60+
DiscardNotifier discardNotifier) {
61+
final ApplicationQueryListener listener = new ApplicationQueryListener(rlistener,
62+
appName + ".query", resolver, sender, asyncProps.getDirect().getExchange(), converter,
63+
"globalReply", asyncProps.getWithDLQRetry(), asyncProps.getMaxRetries(),
64+
asyncProps.getRetryDelay(), discardNotifier);
65+
6466
listener.startListener();
67+
6568
return listener;
6669
}
6770

6871
@Bean
69-
public ApplicationCommandListener applicationCommandListener(ReactiveMessageListener listener, HandlerResolver resolver, MessageConverter converter, DiscardNotifier discardNotifier){
70-
ApplicationCommandListener commandListener = new ApplicationCommandListener(listener, appName, resolver, directMessagesExchangeName, converter, withDLQRetry, maxRetries, retryDelay, discardNotifier);
72+
public ApplicationCommandListener applicationCommandListener(ReactiveMessageListener listener,
73+
HandlerResolver resolver, MessageConverter converter,
74+
DiscardNotifier discardNotifier) {
75+
ApplicationCommandListener commandListener = new ApplicationCommandListener(listener, appName, resolver,
76+
asyncProps.getDirect().getExchange(), converter, asyncProps.getWithDLQRetry(), asyncProps.getMaxRetries(),
77+
asyncProps.getRetryDelay(), discardNotifier);
78+
7179
commandListener.startListener();
80+
7281
return commandListener;
7382
}
7483

@@ -78,26 +87,26 @@ public DynamicRegistry dynamicRegistry(HandlerResolver resolver, ReactiveMessage
7887
}
7988

8089
@Bean
81-
public HandlerResolver resolver(ApplicationContext context, DefaultQueryHandler defaultHandler, Environment env, DefaultCommandHandler defaultCommandHandler) {
90+
public HandlerResolver resolver(ApplicationContext context, DefaultCommandHandler defaultCommandHandler) {
8291
final Map<String, HandlerRegistry> registries = context.getBeansOfType(HandlerRegistry.class);
8392

8493
final ConcurrentMap<String, RegisteredQueryHandler> handlers = registries
85-
.values().stream()
86-
.flatMap(r -> r.getHandlers().stream())
87-
.collect(ConcurrentHashMap::new, (map, handler) -> map.put(handler.getPath(), handler),
88-
ConcurrentHashMap::putAll);
94+
.values().stream()
95+
.flatMap(r -> r.getHandlers().stream())
96+
.collect(ConcurrentHashMap::new, (map, handler) -> map.put(handler.getPath(), handler),
97+
ConcurrentHashMap::putAll);
8998

9099
final ConcurrentMap<String, RegisteredEventListener> eventListeners = registries
91-
.values().stream()
92-
.flatMap(r -> r.getEventListeners().stream())
93-
.collect(ConcurrentHashMap::new, (map, handler) -> map.put(handler.getPath(), handler),
94-
ConcurrentHashMap::putAll);
100+
.values().stream()
101+
.flatMap(r -> r.getEventListeners().stream())
102+
.collect(ConcurrentHashMap::new, (map, handler) -> map.put(handler.getPath(), handler),
103+
ConcurrentHashMap::putAll);
95104

96105
final ConcurrentMap<String, RegisteredCommandHandler> commandHandlers = registries
97-
.values().stream()
98-
.flatMap(r -> r.getCommandHandlers().stream())
99-
.collect(ConcurrentHashMap::new, (map, handler) -> map.put(handler.getPath(), handler),
100-
ConcurrentHashMap::putAll);
106+
.values().stream()
107+
.flatMap(r -> r.getCommandHandlers().stream())
108+
.collect(ConcurrentHashMap::new, (map, handler) -> map.put(handler.getPath(), handler),
109+
ConcurrentHashMap::putAll);
101110

102111
return new HandlerResolver(handlers, eventListeners, commandHandlers) {
103112
@Override
@@ -113,13 +122,13 @@ public <T> RegisteredCommandHandler<T> getCommandHandler(String path) {
113122
@ConditionalOnMissingBean
114123
public DefaultQueryHandler defaultHandler() {
115124
return (DefaultQueryHandler<Object, Object>) command ->
116-
Mono.error(new RuntimeException("No Handler Registered"));
125+
Mono.error(new RuntimeException("No Handler Registered"));
117126
}
118127

119128

120129
@Bean
121130
@ConditionalOnMissingBean
122-
public DefaultCommandHandler defaultCommandHandler(){
131+
public DefaultCommandHandler defaultCommandHandler() {
123132
return message -> Mono.error(new RuntimeException("No Handler Registered"));
124133
}
125134
}

async/async-commons-starter/src/main/java/org/reactivecommons/async/impl/config/RabbitMqConfig.java

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import com.rabbitmq.client.Connection;
44
import com.rabbitmq.client.ConnectionFactory;
5+
import lombok.RequiredArgsConstructor;
56
import lombok.extern.java.Log;
67
import org.reactivecommons.api.domain.DomainEventBus;
78
import org.reactivecommons.async.impl.DiscardNotifier;
@@ -10,7 +11,9 @@
1011
import org.reactivecommons.async.impl.communications.ReactiveMessageListener;
1112
import org.reactivecommons.async.impl.communications.ReactiveMessageSender;
1213
import org.reactivecommons.async.impl.communications.TopologyCreator;
14+
import org.reactivecommons.async.impl.config.props.AsyncProps;
1315
import org.reactivecommons.async.impl.config.props.BrokerConfigProps;
16+
import org.reactivecommons.async.impl.config.props.FluxProps;
1417
import org.reactivecommons.async.impl.converters.MessageConverter;
1518
import org.reactivecommons.async.impl.converters.json.DefaultObjectMapperSupplier;
1619
import org.reactivecommons.async.impl.converters.json.JacksonMessageConverter;
@@ -24,6 +27,7 @@
2427
import org.springframework.context.annotation.Import;
2528
import reactor.core.publisher.Mono;
2629
import reactor.rabbitmq.*;
30+
import reactor.util.retry.Retry;
2731

2832
import java.time.Duration;
2933
import java.util.logging.Level;
@@ -32,17 +36,19 @@
3236

3337
@Log
3438
@Configuration
35-
@EnableConfigurationProperties(RabbitProperties.class)
39+
@RequiredArgsConstructor
40+
@EnableConfigurationProperties({
41+
RabbitProperties.class,
42+
AsyncProps.class
43+
})
3644
@Import(BrokerConfigProps.class)
3745
public class RabbitMqConfig {
3846

3947
private static final String LISTENER_TYPE = "listener";
4048

4149
private static final String SENDER_TYPE = "sender";
4250

43-
44-
@Value("${app.async.flux.maxConcurrency:250}")
45-
private Integer maxConcurrency;
51+
private final AsyncProps asyncProps;
4652

4753
@Value("${spring.application.name}")
4854
private String appName;
@@ -58,6 +64,8 @@ public ReactiveMessageSender messageSender(ConnectionFactoryProvider provider, M
5864
map.from(rabbitProperties.getCache().getChannel()::getSize).whenNonNull()
5965
.to(channelPoolOptions::maxCacheSize);
6066

67+
68+
6169
final ChannelPool channelPool = ChannelPoolFactory.createChannelPool(
6270
senderConnection,
6371
channelPoolOptions
@@ -77,7 +85,9 @@ public ReactiveMessageListener messageListener(ConnectionFactoryProvider provide
7785
final Receiver receiver = RabbitFlux.createReceiver(new ReceiverOptions().connectionMono(connection));
7886
final Sender sender = RabbitFlux.createSender(new SenderOptions().connectionMono(connection));
7987

80-
return new ReactiveMessageListener(receiver, new TopologyCreator(sender), maxConcurrency);
88+
return new ReactiveMessageListener(receiver,
89+
new TopologyCreator(sender),
90+
asyncProps.getFlux().getMaxConcurrency());
8191
}
8292

8393
@Bean
@@ -89,7 +99,6 @@ public ConnectionFactoryProvider connectionFactory(RabbitProperties properties)
8999
map.from(properties::determinePort).to(factory::setPort);
90100
map.from(properties::determineUsername).whenNonNull().to(factory::setUsername);
91101
map.from(properties::determinePassword).whenNonNull().to(factory::setPassword);
92-
map.from(properties::determineVirtualHost).whenNonNull().to(factory::setVirtualHost);
93102
factory.useNio();
94103
return () -> factory;
95104
}
@@ -123,7 +132,8 @@ Mono<Connection> createConnectionMono(ConnectionFactory factory, String connecti
123132
.doOnError(err ->
124133
log.log(Level.SEVERE, "Error creating connection to RabbitMq Broker. Starting retry process...", err)
125134
)
126-
.retryBackoff(Long.MAX_VALUE, Duration.ofMillis(300), Duration.ofMillis(3000))
135+
.retryWhen(Retry.backoff(Long.MAX_VALUE, Duration.ofMillis(300))
136+
.maxBackoff(Duration.ofMillis(3000)))
127137
.cache();
128138
}
129139

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
package org.reactivecommons.async.impl.config.props;
2+
3+
import lombok.Getter;
4+
import lombok.Setter;
5+
import org.springframework.boot.context.properties.ConfigurationProperties;
6+
import org.springframework.boot.context.properties.NestedConfigurationProperty;
7+
8+
9+
@Getter
10+
@Setter
11+
@ConfigurationProperties(prefix = "app.async")
12+
public class AsyncProps {
13+
14+
@NestedConfigurationProperty
15+
private FluxProps flux = new FluxProps();
16+
17+
@NestedConfigurationProperty
18+
private DomainProps domain = new DomainProps();
19+
20+
@NestedConfigurationProperty
21+
private DirectProps direct = new DirectProps();
22+
23+
private Integer maxRetries = 10;
24+
25+
private Integer retryDelay = 1000;
26+
27+
private Boolean withDLQRetry = false;
28+
29+
}

async/async-commons-starter/src/main/java/org/reactivecommons/async/impl/config/props/BrokerConfigProps.java

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package org.reactivecommons.async.impl.config.props;
22

33
import lombok.Getter;
4+
import lombok.RequiredArgsConstructor;
45
import org.reactivecommons.async.impl.config.IBrokerConfigProps;
56
import org.springframework.beans.factory.annotation.Value;
67
import org.springframework.context.annotation.Configuration;
@@ -13,16 +14,13 @@
1314

1415
@Getter
1516
@Configuration
17+
@RequiredArgsConstructor
1618
public class BrokerConfigProps implements IBrokerConfigProps {
1719

1820
@Value("${spring.application.name}")
1921
private String appName;
2022

21-
@Value("${app.async.domain.events.exchange:domainEvents}")
22-
private String domainEventsExchangeName;
23-
24-
@Value("${app.async.direct.exchange:directMessages}")
25-
private String directMessagesExchangeName;
23+
private final AsyncProps asyncProps;
2624

2725
private final AtomicReference<String> replyQueueName = new AtomicReference<>();
2826

@@ -45,7 +43,7 @@ public String getCommandsQueue() {
4543
public String getReplyQueue() {
4644
final String name = replyQueueName.get();
4745
if (name == null) {
48-
final String replyName = newRandomQueueName();
46+
final String replyName = newRandomQueueName();
4947
if (replyQueueName.compareAndSet(null, replyName)) {
5048
return replyName;
5149
} else {
@@ -55,13 +53,23 @@ public String getReplyQueue() {
5553
return name;
5654
}
5755

56+
@Override
57+
public String getDomainEventsExchangeName() {
58+
return asyncProps.getDomain().getEvents().getExchange();
59+
}
60+
61+
@Override
62+
public String getDirectMessagesExchangeName() {
63+
return asyncProps.getDirect().getExchange();
64+
}
65+
5866
private String newRandomQueueName() {
5967
UUID uuid = UUID.randomUUID();
6068
ByteBuffer bb = ByteBuffer.wrap(new byte[16]);
6169
bb.putLong(uuid.getMostSignificantBits())
62-
.putLong(uuid.getLeastSignificantBits());
70+
.putLong(uuid.getLeastSignificantBits());
6371
return appName + Base64Utils.encodeToUrlSafeString(bb.array())
64-
.replaceAll("=", "");
72+
.replaceAll("=", "");
6573
}
6674

6775
}
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
package org.reactivecommons.async.impl.config.props;
2+
3+
import lombok.Getter;
4+
import lombok.Setter;
5+
6+
@Getter
7+
@Setter
8+
public class DirectProps {
9+
10+
private String exchange = "directMessages";
11+
12+
}
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
package org.reactivecommons.async.impl.config.props;
2+
3+
import lombok.Getter;
4+
import lombok.Setter;
5+
import org.springframework.boot.context.properties.NestedConfigurationProperty;
6+
7+
@Getter
8+
@Setter
9+
public class DomainProps {
10+
11+
@NestedConfigurationProperty
12+
private EventsProps events = new EventsProps();
13+
14+
}
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
package org.reactivecommons.async.impl.config.props;
2+
3+
import lombok.Getter;
4+
import lombok.Setter;
5+
6+
@Getter
7+
@Setter
8+
public class EventsProps {
9+
10+
private String exchange = "domainEvents";
11+
12+
}
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
package org.reactivecommons.async.impl.config.props;
2+
3+
import lombok.Getter;
4+
import lombok.Setter;
5+
6+
@Getter
7+
@Setter
8+
public class FluxProps {
9+
10+
private Integer maxConcurrency = 250;
11+
12+
}

0 commit comments

Comments
 (0)