Skip to content

Commit 70329d8

Browse files
committed
feat: add prefetch count property
1 parent aa5ef29 commit 70329d8

File tree

7 files changed

+22
-27
lines changed

7 files changed

+22
-27
lines changed

async/async-commons-starter/src/main/java/org/reactivecommons/async/impl/config/RabbitMqConfig.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
import org.reactivecommons.async.impl.communications.TopologyCreator;
1414
import org.reactivecommons.async.impl.config.props.AsyncProps;
1515
import org.reactivecommons.async.impl.config.props.BrokerConfigProps;
16-
import org.reactivecommons.async.impl.config.props.FluxProps;
1716
import org.reactivecommons.async.impl.converters.MessageConverter;
1817
import org.reactivecommons.async.impl.converters.json.DefaultObjectMapperSupplier;
1918
import org.reactivecommons.async.impl.converters.json.JacksonMessageConverter;
@@ -64,8 +63,6 @@ public ReactiveMessageSender messageSender(ConnectionFactoryProvider provider, M
6463
map.from(rabbitProperties.getCache().getChannel()::getSize).whenNonNull()
6564
.to(channelPoolOptions::maxCacheSize);
6665

67-
68-
6966
final ChannelPool channelPool = ChannelPoolFactory.createChannelPool(
7067
senderConnection,
7168
channelPoolOptions
@@ -87,7 +84,8 @@ public ReactiveMessageListener messageListener(ConnectionFactoryProvider provide
8784

8885
return new ReactiveMessageListener(receiver,
8986
new TopologyCreator(sender),
90-
asyncProps.getFlux().getMaxConcurrency());
87+
asyncProps.getFlux().getMaxConcurrency(),
88+
asyncProps.getPrefetchCount());
9189
}
9290

9391
@Bean
@@ -99,6 +97,7 @@ public ConnectionFactoryProvider connectionFactory(RabbitProperties properties)
9997
map.from(properties::determinePort).to(factory::setPort);
10098
map.from(properties::determineUsername).whenNonNull().to(factory::setUsername);
10199
map.from(properties::determinePassword).whenNonNull().to(factory::setPassword);
100+
map.from(properties::determineVirtualHost).whenNonNull().to(factory::setVirtualHost);
102101
factory.useNio();
103102
return () -> factory;
104103
}

async/async-commons-starter/src/main/java/org/reactivecommons/async/impl/config/props/AsyncProps.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ public class AsyncProps {
2222

2323
private Integer maxRetries = 10;
2424

25+
private Integer prefetchCount = 250;
26+
2527
private Integer retryDelay = 1000;
2628

2729
private Boolean withDLQRetry = false;
Original file line numberDiff line numberDiff line change
@@ -1,33 +1,22 @@
11
package org.reactivecommons.async.impl.communications;
22

3+
import lombok.Getter;
4+
import lombok.RequiredArgsConstructor;
35
import reactor.rabbitmq.Receiver;
46

7+
8+
@Getter
9+
@RequiredArgsConstructor
510
public class ReactiveMessageListener {
611

712
private final Receiver receiver;
813
private final TopologyCreator topologyCreator;
914
private final Integer maxConcurrency;
15+
private final Integer prefetchCount;
1016

1117
public ReactiveMessageListener(Receiver receiver, TopologyCreator topologyCreator) {
12-
this(receiver, topologyCreator, 250);
13-
}
14-
15-
public ReactiveMessageListener(Receiver receiver, TopologyCreator topologyCreator, Integer maxConcurrency) {
16-
this.receiver = receiver;
17-
this.topologyCreator = topologyCreator;
18-
this.maxConcurrency = maxConcurrency;
19-
}
20-
21-
public TopologyCreator getTopologyCreator() {
22-
return topologyCreator;
23-
}
24-
25-
public Receiver getReceiver() {
26-
return receiver;
18+
this(receiver, topologyCreator, 250, 250);
2719
}
2820

29-
public Integer getMaxConcurrency() {
30-
return maxConcurrency;
31-
}
3221
}
3322

async/async-commons/src/main/java/org/reactivecommons/async/impl/listeners/GenericMessageListener.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import reactor.core.scheduler.Scheduler;
1414
import reactor.core.scheduler.Schedulers;
1515
import reactor.rabbitmq.AcknowledgableDelivery;
16+
import reactor.rabbitmq.ConsumeOptions;
1617
import reactor.rabbitmq.Receiver;
1718

1819
import java.time.Duration;
@@ -64,8 +65,12 @@ public void startListener() {
6465
} else {
6566
log.log(Level.INFO, "ATTENTION! Using infinite fast retries as Retry Strategy");
6667
}
68+
69+
ConsumeOptions consumeOptions = new ConsumeOptions();
70+
consumeOptions.qos(messageListener.getPrefetchCount());
71+
6772
setUpBindings(messageListener.getTopologyCreator()).thenMany(
68-
receiver.consumeManualAck(queueName)
73+
receiver.consumeManualAck(queueName, consumeOptions)
6974
.transform(this::consumeFaultTolerant)
7075
.transform(this::outerFailureProtection))
7176
.subscribe();

domain/domain-events/domain-events-api.gradle

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -72,5 +72,5 @@ bintray {
7272
}
7373

7474
dependencies {
75-
api group: 'org.reactivestreams', name: 'reactive-streams'
76-
}
75+
api group: 'org.reactivestreams', name: 'reactive-streams', version: '1.0.3'
76+
}

gradle.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
1-
version=0.4.3
1+
version=0.4.5
22
springBootVersion=2.2.6.RELEASE
33
gradleVersionsVersion=0.28.0
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
spring.application.name=Receiver2
1+
spring.application.name=Receiver2

0 commit comments

Comments
 (0)