Skip to content
This repository was archived by the owner on Jun 20, 2025. It is now read-only.

Fixed utilizing all cores for runForAsync #82

Merged
merged 4 commits into from
Aug 30, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
public class BenchmarksSettings {

private static final int N_THREADS = Runtime.getRuntime().availableProcessors();
private static final int CONCURRENCY = 16;
private static final Duration EXECUTION_TASK_DURATION = Duration.ofSeconds(60);
private static final Duration EXECUTION_TASK_INTERVAL = Duration.ZERO;
private static final Duration MINIMAL_INTERVAL = Duration.ofMillis(100);
Expand All @@ -32,6 +33,7 @@ public class BenchmarksSettings {
Pattern.compile(ALIAS_PATTERN).asPredicate();

private final int numberThreads;
private final int concurrency;
private final Duration executionTaskDuration;
private final Duration executionTaskInterval;
private final Duration reporterInterval;
Expand Down Expand Up @@ -78,6 +80,7 @@ private BenchmarksSettings(Builder builder) {
this.messagesPerExecutionInterval = builder.messagesPerExecutionInterval;
this.injectors = builder.injectors;
this.messageRate = builder.messageRate;
this.concurrency = builder.concurrency;

this.registry = new MetricRegistry();

Expand Down Expand Up @@ -171,10 +174,15 @@ public int messagesPerExecutionInterval() {
return messagesPerExecutionInterval;
}

public int concurrency() {
return concurrency;
}

@Override
public String toString() {
final StringBuilder sb = new StringBuilder("BenchmarksSettings{");
sb.append("numberThreads=").append(numberThreads);
sb.append("concurrency=").append(concurrency);
sb.append(", executionTaskDuration=").append(executionTaskDuration);
sb.append(", executionTaskInterval=").append(executionTaskInterval);
sb.append(", numOfIterations=").append(numOfIterations);
Expand Down Expand Up @@ -218,6 +226,7 @@ public static class Builder {
private int messageRate; // optional
private int injectorsPerRampUpInterval; // calculated
private int messagesPerExecutionInterval; // calculated
private int concurrency = CONCURRENCY;

private Builder() {
this.options = new HashMap<>();
Expand All @@ -240,6 +249,7 @@ private Builder(Builder that) {
this.messagesPerExecutionInterval = that.messagesPerExecutionInterval;
this.injectors = that.injectors;
this.messageRate = that.messageRate;
this.concurrency = that.concurrency;
}

public Builder numberThreads(int numThreads) {
Expand Down Expand Up @@ -307,6 +317,11 @@ public Builder messageRate(int messageRate) {
return this;
}

public Builder concurrency(int concurrency) {
this.concurrency = concurrency;
return this;
}

public BenchmarksSettings build() {
return new BenchmarksSettings(new Builder(this).parseArgs().calculateDynamicParams());
}
Expand Down Expand Up @@ -383,6 +398,9 @@ private Builder parseArgs() {
case "nThreads":
numberThreads(Integer.parseInt(value));
break;
case "concurrency":
concurrency(Integer.parseInt(value));
break;
case "executionTaskDurationInSec":
executionTaskDuration(Duration.ofSeconds(Long.parseLong(value)));
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,12 +268,25 @@ public final void runForAsync(Function<S, Function<Long, Publisher<?>>> func) {
self.start();

Function<Long, Publisher<?>> unitOfWork = func.apply(self);

Flux<Long> fromStream =
Flux.fromStream(LongStream.range(0, settings.numOfIterations()).boxed());

Flux.merge(fromStream.publishOn(scheduler()).map(unitOfWork))
.take(settings.executionTaskDuration())
int threads = settings.numberThreads();
long countPerThread = settings.numOfIterations() / threads;

Function<Integer, Mono<?>> scenarioPerThread =
i ->
Mono.fromRunnable(
() -> {
long start = i * countPerThread;
Flux.fromStream(LongStream.range(start, start + countPerThread).boxed())
.flatMap(unitOfWork::apply, settings.concurrency(), Integer.MAX_VALUE)
.take(settings.executionTaskDuration())
.blockLast();
});

Flux.range(0, threads)
.flatMap(
i -> scenarioPerThread.apply(i).subscribeOn(scheduler()),
Integer.MAX_VALUE,
Integer.MAX_VALUE)
.blockLast();
} finally {
self.shutdown();
Expand Down