Skip to content
This repository was archived by the owner on Jun 20, 2025. It is now read-only.

Calculate scenario dynamic params in settings #66

Merged
merged 12 commits into from
Aug 6, 2018
17 changes: 17 additions & 0 deletions benchmarks-api/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,23 @@
<scope>provided</scope>
<optional>true</optional>
</dependency>

<!-- Test scope -->
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-params</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ public class BenchmarksSettings {
private static final int N_THREADS = Runtime.getRuntime().availableProcessors();
private static final Duration EXECUTION_TASK_DURATION = Duration.ofSeconds(60);
private static final Duration EXECUTION_TASK_INTERVAL = Duration.ZERO;
private static final Duration MINIMAL_INTERVAL = Duration.ofMillis(100);
private static final Duration REPORTER_INTERVAL = Duration.ofSeconds(3);
private static final TimeUnit DURATION_UNIT = TimeUnit.MILLISECONDS;
private static final TimeUnit RATE_UNIT = TimeUnit.SECONDS;
Expand All @@ -43,6 +44,10 @@ public class BenchmarksSettings {
private final Duration rampUpDuration;
private final Duration rampUpInterval;
private final boolean consoleReporterEnabled;
private final int injectors;
private final int messageRate;
private final int injectorsPerRampUpInterval;
private final int messagesPerExecutionInterval;

private final Map<String, String> options;

Expand All @@ -64,6 +69,10 @@ private BenchmarksSettings(Builder builder) {
this.options = builder.options;
this.durationUnit = builder.durationUnit;
this.rateUnit = builder.rateUnit;
this.injectorsPerRampUpInterval = builder.injectorsPerRampUpInterval;
this.messagesPerExecutionInterval = builder.messagesPerExecutionInterval;
this.injectors = builder.injectors;
this.messageRate = builder.messageRate;

this.registry = new MetricRegistry();

Expand Down Expand Up @@ -139,6 +148,22 @@ public boolean consoleReporterEnabled() {
return consoleReporterEnabled;
}

public int injectors() {
return injectors;
}

public int messageRate() {
return messageRate;
}

public int injectorsPerRampUpInterval() {
return injectorsPerRampUpInterval;
}

public int messagesPerExecutionInterval() {
return messagesPerExecutionInterval;
}

@Override
public String toString() {
final StringBuilder sb = new StringBuilder("BenchmarksSettings{");
Expand All @@ -155,6 +180,10 @@ public String toString() {
sb.append(", rampUpInterval=").append(rampUpInterval);
sb.append(", consoleReporterEnabled=").append(consoleReporterEnabled);
sb.append(", registry=").append(registry);
sb.append(", injectors=").append(injectors);
sb.append(", messageRate=").append(messageRate);
sb.append(", injectorsPerRampUpInterval=").append(injectorsPerRampUpInterval);
sb.append(", messagesPerExecutionInterval=").append(messagesPerExecutionInterval);
sb.append(", options=").append(options);
sb.append('}');
return sb.toString();
Expand All @@ -175,9 +204,13 @@ public static class Builder {
private TimeUnit rateUnit = RATE_UNIT;
private long numOfIterations = NUM_OF_ITERATIONS;
private Duration rampUpDuration = RAMP_UP_DURATION;
private Duration rampUpInterval = RAMP_UP_INTERVAL;
private Duration rampUpInterval = RAMP_UP_INTERVAL; // calculated
private boolean consoleReporterEnabled = CONSOLE_REPORTER_ENABLED;
private String[] args = new String[] {};
private int injectors; // optional
private int messageRate; // optional
private int injectorsPerRampUpInterval; // calculated
private int messagesPerExecutionInterval; // calculated

private Builder() {
this.options = new HashMap<>();
Expand All @@ -196,6 +229,10 @@ private Builder(Builder that) {
this.rampUpInterval = that.rampUpInterval;
this.consoleReporterEnabled = that.consoleReporterEnabled;
this.args = that.args;
this.injectorsPerRampUpInterval = that.injectorsPerRampUpInterval;
this.messagesPerExecutionInterval = that.messagesPerExecutionInterval;
this.injectors = that.injectors;
this.messageRate = that.messageRate;
}

public Builder nThreads(int numThreads) {
Expand Down Expand Up @@ -253,8 +290,75 @@ public Builder consoleReporterEnabled(boolean consoleReporterEnabled) {
return this;
}

public Builder injectors(int injectors) {
this.injectors = injectors;
return this;
}

public Builder messageRate(int messageRate) {
this.messageRate = messageRate;
return this;
}

public BenchmarksSettings build() {
return new BenchmarksSettings(new Builder(this).parseArgs());
return new BenchmarksSettings(new Builder(this).parseArgs().calculateDynamicParams());
}

private Builder calculateDynamicParams() {
// if no "injectors specified - don't calculate, means we are
// running another type of scenario and don't want to overload any parameters
if (injectors <= 0 && messageRate <= 0) {
return this;
} else if (injectors <= 0) {
// specify both params
throw new IllegalArgumentException("'injectors' must be greater than 0");
} else if (messageRate <= 0) {
// specify both params
throw new IllegalArgumentException("'messageRate' must be greater than 0");
}

if (rampUpDuration.isZero()) {
throw new IllegalArgumentException("'rampUpDuration' must be greater than 0");
}

if (rampUpDuration.compareTo(executionTaskDuration) > 0) {
throw new IllegalArgumentException("'rampUpDuration' must be greater than 'executionTaskDuration'");
}

// calculate rampup parameters
long rampUpDurationMillis = this.rampUpDuration.toMillis();

if (rampUpDurationMillis / injectors >= MINIMAL_INTERVAL.toMillis()) {
// 1. Can provide rampup injecting 1 injector per minimal interval
this.injectorsPerRampUpInterval = 1;
this.rampUpInterval = Duration.ofMillis(rampUpDurationMillis / injectors);
} else {
// 2. Need to inject multiple injectors per minimal interval to provide rampup
long intervals = Math.floorDiv(rampUpDurationMillis, MINIMAL_INTERVAL.toMillis());
this.injectorsPerRampUpInterval = (int) Math.floorDiv(injectors, intervals);
this.rampUpInterval = MINIMAL_INTERVAL;
}

// calculate execution parameters
double injectorRate = (double) messageRate / injectors;
if (injectorRate <= 1) {
// 1. Enough injectors to provide the required rate sending each injector 1 msg per (>= 1 second)
this.messagesPerExecutionInterval = 1;
this.executionTaskInterval = Duration.ofMillis((long) (1000 / injectorRate));
} else {
int maxInjectorsLoad = (int) Math.floorDiv(injectors * 1000, MINIMAL_INTERVAL.toMillis());
if (maxInjectorsLoad >= messageRate) {
// 2. Still can provide the required rate sending 1 mesg per tick, execution interval = [MIN_INTERVAL, 1 sec]
this.messagesPerExecutionInterval = 1;
this.executionTaskInterval =
Duration.ofMillis(Math.floorDiv(maxInjectorsLoad, messageRate) * MINIMAL_INTERVAL.toMillis());
} else {
// 3. Have to send multiple messages per execution interval , interval already minimum (MIN_INTERVAL)
this.messagesPerExecutionInterval = Math.floorDiv(messageRate, maxInjectorsLoad);
this.executionTaskInterval = MINIMAL_INTERVAL;
}
}
return this;
}

private Builder parseArgs() {
Expand Down Expand Up @@ -288,6 +392,12 @@ private Builder parseArgs() {
case "consoleReporterEnabled":
consoleReporterEnabled(Boolean.parseBoolean(value));
break;
case "injectors":
injectors(Integer.parseInt(value));
break;
case "messageRate":
messageRate(Integer.parseInt(value));
break;
default:
addOption(key, value);
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -329,24 +329,27 @@ public final <T> void runWithRampUp(
int schedulerIndex = (int) ((rampUpIteration & Long.MAX_VALUE) % schedulers().size());
Scheduler scheduler = schedulers().get(schedulerIndex);

// create tasks on selected scheduler
Flux<T> setUpFactory = Flux.create((FluxSink<T> sink) -> {
Flux<T> deferSetUp = Flux.defer(() -> setUp.apply(rampUpIteration, self));
deferSetUp.subscribe(
sink::next,
ex -> {
LOGGER.error("Exception occured on setUp at rampUpIteration: {}, "
+ "cause: {}, task won't start", rampUpIteration, ex);
sink.complete();
},
sink::complete);
});
return setUpFactory
.subscribeOn(scheduler)
.map(setUpResult -> new BenchmarksTask<>(self, setUpResult, unitOfWork, cleanUp, scheduler))
.doOnNext(scheduler::schedule)
.flatMap(BenchmarksTask::completionMono);

return Flux
.range(0, Math.max(1, settings.injectorsPerRampUpInterval()))
.flatMap(iteration1 -> {
// create tasks on selected scheduler
Flux<T> setUpFactory = Flux.create((FluxSink<T> sink) -> {
Flux<T> deferSetUp = Flux.defer(() -> setUp.apply(rampUpIteration, self));
deferSetUp.subscribe(
sink::next,
ex -> {
LOGGER.error("Exception occured on setUp at rampUpIteration: {}, "
+ "cause: {}, task won't start", rampUpIteration, ex);
sink.complete();
},
sink::complete);
});
return setUpFactory
.subscribeOn(scheduler)
.map(setUpResult -> new BenchmarksTask<>(self, setUpResult, unitOfWork, cleanUp, scheduler))
.doOnNext(scheduler::schedule)
.flatMap(BenchmarksTask::completionMono);
});
}, Integer.MAX_VALUE, Integer.MAX_VALUE)
.blockLast();
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,9 @@ public void run() {
if (isScheduled()) { // executing
long iter = iterationsCounter.incrementAndGet();

Flux.from(unitOfWork.apply(iter, setUpResult))
Flux
.range(0, Math.max(1, benchmarksState.settings.messagesPerExecutionInterval()))
.flatMap(iteration1 -> Flux.from(unitOfWork.apply(iter, setUpResult)))
.doOnError(ex -> LOGGER.warn("Exception occured on unitOfWork at iteration: {}, cause: {}", iter, ex))
.subscribe();

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package io.scalecube.benchmarks;

import static org.junit.jupiter.api.Assertions.assertEquals;

import org.junit.jupiter.api.Test;

import java.time.Duration;

class BenchmarksSettingsTest {
private static final String[] EMPTY_ARGS = new String[] {};

@Test
void testOneUserAndSevenMsgRate() {
BenchmarksSettings settings = BenchmarksSettings.from(EMPTY_ARGS)
.injectors(1)
.messageRate(7)
.executionTaskDuration(Duration.ofSeconds(10))
.executionTaskInterval(Duration.ofSeconds(3))
.build();

assertEquals(Duration.ofSeconds(10), settings.rampUpDuration());
assertEquals(Duration.ofSeconds(10), settings.rampUpInterval());
assertEquals(Duration.ofMillis(100), settings.executionTaskInterval());
assertEquals(Duration.ofSeconds(10), settings.executionTaskDuration());
assertEquals(1, settings.injectorsPerRampUpInterval());
assertEquals(1, settings.messagesPerExecutionInterval());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import com.codahale.metrics.Timer;

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.time.Duration;
import java.util.concurrent.TimeUnit;
Expand All @@ -18,14 +18,15 @@ public class RampUpExampleBenchmarksRunner {
*/
public static void main(String[] args) {
BenchmarksSettings settings = BenchmarksSettings.from(args)
.rampUpDuration(Duration.ofSeconds(10))
.rampUpInterval(Duration.ofSeconds(1))
.rampUpDuration(Duration.ofSeconds(30))
.injectors(10_000)
.messageRate(50_000)
.executionTaskDuration(Duration.ofSeconds(30))
.durationUnit(TimeUnit.NANOSECONDS)
.build();

new ExampleServiceBenchmarksState(settings).runWithRampUp(
(rampUpIteration, state) -> Flux.range(1, 3).map(i -> new ServiceCaller(state.exampleService())),
(rampUpIteration, state) -> Mono.just(new ServiceCaller(state.exampleService())),
state -> {
Timer timer = state.timer("timer");
return (iteration, serviceCaller) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ public static void main(String[] args) {
.rampUpDuration(Duration.ofSeconds(10))
.rampUpInterval(Duration.ofSeconds(1))
.executionTaskDuration(Duration.ofSeconds(30))
.executionTaskInterval(Duration.ZERO)
.durationUnit(TimeUnit.NANOSECONDS)
.build();

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package io.scalecube.benchmarks.examples;

import io.scalecube.benchmarks.BenchmarksSettings;

import reactor.core.publisher.Mono;

import java.time.Duration;
import java.time.LocalDateTime;
import java.util.concurrent.TimeUnit;

public class RampUpSimpleTestRunner {

/**
* Runs example benchmark.
*
* @param args command line args
*/
public static void main(String[] args) {
BenchmarksSettings settings = BenchmarksSettings.from(args)
.injectors(1000)
.messageRate(10_000)
.rampUpDuration(Duration.ofSeconds(15))
.executionTaskDuration(Duration.ofSeconds(30))
.consoleReporterEnabled(false)
.durationUnit(TimeUnit.NANOSECONDS)
.build();

System.out.println("Settings:");
System.out.println(settings);
System.out.println(LocalDateTime.now() + " Test started");

new ExampleServiceBenchmarksState(settings).runWithRampUp(
// set up
(rampUpIteration, state) -> {
// System.out.println(LocalDateTime.now() + " User started: " + rampUpIteration);
return Mono.just(rampUpIteration);
},

// job
state -> (iteration, userId) -> {
// System.out.println(LocalDateTime.now() + " User: " + userId + " | iteration: " + iteration);
;
return Mono.fromRunnable(RampUpSimpleTestRunner::heavy);
},

// teardown
(state, userId) -> {
// System.out.println(LocalDateTime.now() + " User done:" + userId);
return Mono.empty();
});
System.out.println(LocalDateTime.now() + " Test over");
}

private static void heavy() {
for (int i = 0; i < 100; i++) {
// noinspection ResultOfMethodCallIgnored
Math.hypot(20, 29 + i);
}
}
}
Loading