Skip to content
This repository was archived by the owner on Jun 20, 2025. It is now read-only.

Prepare new release #64

Merged
merged 36 commits into from
Aug 7, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
ecd414e
++++ Prepare for next development iteration build: 316 ++++
io-scalecube-ci Aug 1, 2018
abeee54
move calculation of dynamic parameters to Settings builder.
Aug 2, 2018
087f77e
modify runWithRampUp behavior so it now uses dynamic parameters
Aug 2, 2018
2fe1f2a
Some changes
Aug 2, 2018
ab001e6
Add validation of parameters
Aug 2, 2018
5b0b42c
+ script files
io-scalecube-ci Aug 3, 2018
56f4872
add support of new parameters in command line
Aug 3, 2018
c8adb79
add support of new parameters in command line
Aug 3, 2018
cda73e6
Minor
segabriel Aug 3, 2018
14ef2de
Added test
segabriel Aug 3, 2018
c73d14d
Merge pull request #65 from scalecube/travis-ci-cd
artem-v Aug 6, 2018
72ab0ce
rename users -> injectors
Aug 6, 2018
714fe5c
+ script files
io-scalecube-ci Aug 6, 2018
49380a0
allow to set all settings parameters manually
Aug 6, 2018
d9a0f66
Merge branch 'develop' into calculate-scenario-dynamic-params-in-sett…
artem-v Aug 6, 2018
16f1bed
Removed MDC
segabriel Aug 6, 2018
1e7ea75
Added onstartup policy (a new log file is created every time the JVM …
segabriel Aug 6, 2018
eec0bdb
Added separated module
segabriel Aug 6, 2018
8952108
Cosmetic update after CR
artem-v Aug 6, 2018
652399f
Fixed review issues
segabriel Aug 6, 2018
9cb3a63
Merge pull request #66 from scalecube/calculate-scenario-dynamic-para…
artem-v Aug 6, 2018
bc88306
Merge branch 'develop' into travis-ci-cd
artem-v Aug 6, 2018
31c54f2
Merge branch 'develop' into feature/removing-mdc
artem-v Aug 6, 2018
e0e2fff
Merge pull request #67 from scalecube/travis-ci-cd
artem-v Aug 6, 2018
42505fb
Added async appender
segabriel Aug 6, 2018
8a0f991
Merge remote-tracking branch 'origin/feature/removing-mdc' into featu…
segabriel Aug 6, 2018
4e5cb76
Merge branch 'develop' into feature/removing-mdc
segabriel Aug 6, 2018
1ee0a34
Changed on jctools async appender queue
segabriel Aug 6, 2018
81d9e85
Merge remote-tracking branch 'origin/feature/removing-mdc' into featu…
segabriel Aug 6, 2018
32a77bc
Added immediateFlush="false"
segabriel Aug 6, 2018
ee26de2
Minor
segabriel Aug 6, 2018
7881092
Cosmetic update after CR
artem-v Aug 6, 2018
eb1e841
Added console xml example
segabriel Aug 6, 2018
f46bb63
CR changes
artem-v Aug 7, 2018
a88c5ae
Merge pull request #68 from scalecube/feature/removing-mdc
artem-v Aug 7, 2018
a87fb52
Merge branch 'master' into develop
artem-v Aug 7, 2018
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions benchmarks-api/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,23 @@
<scope>provided</scope>
<optional>true</optional>
</dependency>

<!-- Test scope -->
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-params</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ public class BenchmarksSettings {
private static final int N_THREADS = Runtime.getRuntime().availableProcessors();
private static final Duration EXECUTION_TASK_DURATION = Duration.ofSeconds(60);
private static final Duration EXECUTION_TASK_INTERVAL = Duration.ZERO;
private static final Duration MINIMAL_INTERVAL = Duration.ofMillis(100);
private static final Duration REPORTER_INTERVAL = Duration.ofSeconds(3);
private static final TimeUnit DURATION_UNIT = TimeUnit.MILLISECONDS;
private static final TimeUnit RATE_UNIT = TimeUnit.SECONDS;
Expand All @@ -43,6 +44,10 @@ public class BenchmarksSettings {
private final Duration rampUpDuration;
private final Duration rampUpInterval;
private final boolean consoleReporterEnabled;
private final int injectors;
private final int messageRate;
private final int injectorsPerRampUpInterval;
private final int messagesPerExecutionInterval;

private final Map<String, String> options;

Expand All @@ -64,6 +69,10 @@ private BenchmarksSettings(Builder builder) {
this.options = builder.options;
this.durationUnit = builder.durationUnit;
this.rateUnit = builder.rateUnit;
this.injectorsPerRampUpInterval = builder.injectorsPerRampUpInterval;
this.messagesPerExecutionInterval = builder.messagesPerExecutionInterval;
this.injectors = builder.injectors;
this.messageRate = builder.messageRate;

this.registry = new MetricRegistry();

Expand Down Expand Up @@ -139,6 +148,22 @@ public boolean consoleReporterEnabled() {
return consoleReporterEnabled;
}

public int injectors() {
return injectors;
}

public int messageRate() {
return messageRate;
}

public int injectorsPerRampUpInterval() {
return injectorsPerRampUpInterval;
}

public int messagesPerExecutionInterval() {
return messagesPerExecutionInterval;
}

@Override
public String toString() {
final StringBuilder sb = new StringBuilder("BenchmarksSettings{");
Expand All @@ -155,6 +180,10 @@ public String toString() {
sb.append(", rampUpInterval=").append(rampUpInterval);
sb.append(", consoleReporterEnabled=").append(consoleReporterEnabled);
sb.append(", registry=").append(registry);
sb.append(", injectors=").append(injectors);
sb.append(", messageRate=").append(messageRate);
sb.append(", injectorsPerRampUpInterval=").append(injectorsPerRampUpInterval);
sb.append(", messagesPerExecutionInterval=").append(messagesPerExecutionInterval);
sb.append(", options=").append(options);
sb.append('}');
return sb.toString();
Expand All @@ -175,9 +204,13 @@ public static class Builder {
private TimeUnit rateUnit = RATE_UNIT;
private long numOfIterations = NUM_OF_ITERATIONS;
private Duration rampUpDuration = RAMP_UP_DURATION;
private Duration rampUpInterval = RAMP_UP_INTERVAL;
private Duration rampUpInterval = RAMP_UP_INTERVAL; // calculated
private boolean consoleReporterEnabled = CONSOLE_REPORTER_ENABLED;
private String[] args = new String[] {};
private int injectors; // optional
private int messageRate; // optional
private int injectorsPerRampUpInterval; // calculated
private int messagesPerExecutionInterval; // calculated

private Builder() {
this.options = new HashMap<>();
Expand All @@ -196,6 +229,10 @@ private Builder(Builder that) {
this.rampUpInterval = that.rampUpInterval;
this.consoleReporterEnabled = that.consoleReporterEnabled;
this.args = that.args;
this.injectorsPerRampUpInterval = that.injectorsPerRampUpInterval;
this.messagesPerExecutionInterval = that.messagesPerExecutionInterval;
this.injectors = that.injectors;
this.messageRate = that.messageRate;
}

public Builder nThreads(int numThreads) {
Expand Down Expand Up @@ -253,8 +290,75 @@ public Builder consoleReporterEnabled(boolean consoleReporterEnabled) {
return this;
}

public Builder injectors(int injectors) {
this.injectors = injectors;
return this;
}

public Builder messageRate(int messageRate) {
this.messageRate = messageRate;
return this;
}

public BenchmarksSettings build() {
return new BenchmarksSettings(new Builder(this).parseArgs());
return new BenchmarksSettings(new Builder(this).parseArgs().calculateDynamicParams());
}

private Builder calculateDynamicParams() {
// if no "injectors specified - don't calculate, means we are
// running another type of scenario and don't want to overload any parameters
if (injectors <= 0 && messageRate <= 0) {
return this;
} else if (injectors <= 0) {
// specify both params
throw new IllegalArgumentException("'injectors' must be greater than 0");
} else if (messageRate <= 0) {
// specify both params
throw new IllegalArgumentException("'messageRate' must be greater than 0");
}

if (rampUpDuration.isZero()) {
throw new IllegalArgumentException("'rampUpDuration' must be greater than 0");
}

if (rampUpDuration.compareTo(executionTaskDuration) > 0) {
throw new IllegalArgumentException("'rampUpDuration' must be greater than 'executionTaskDuration'");
}

// calculate rampup parameters
long rampUpDurationMillis = this.rampUpDuration.toMillis();

if (rampUpDurationMillis / injectors >= MINIMAL_INTERVAL.toMillis()) {
// 1. Can provide rampup injecting 1 injector per minimal interval
this.injectorsPerRampUpInterval = 1;
this.rampUpInterval = Duration.ofMillis(rampUpDurationMillis / injectors);
} else {
// 2. Need to inject multiple injectors per minimal interval to provide rampup
long intervals = Math.floorDiv(rampUpDurationMillis, MINIMAL_INTERVAL.toMillis());
this.injectorsPerRampUpInterval = (int) Math.floorDiv(injectors, intervals);
this.rampUpInterval = MINIMAL_INTERVAL;
}

// calculate execution parameters
double injectorRate = (double) messageRate / injectors;
if (injectorRate <= 1) {
// 1. Enough injectors to provide the required rate sending each injector 1 msg per (>= 1 second)
this.messagesPerExecutionInterval = 1;
this.executionTaskInterval = Duration.ofMillis((long) (1000 / injectorRate));
} else {
int maxInjectorsLoad = (int) Math.floorDiv(injectors * 1000, MINIMAL_INTERVAL.toMillis());
if (maxInjectorsLoad >= messageRate) {
// 2. Still can provide the required rate sending 1 mesg per tick, execution interval = [MIN_INTERVAL, 1 sec]
this.messagesPerExecutionInterval = 1;
this.executionTaskInterval =
Duration.ofMillis(Math.floorDiv(maxInjectorsLoad, messageRate) * MINIMAL_INTERVAL.toMillis());
} else {
// 3. Have to send multiple messages per execution interval , interval already minimum (MIN_INTERVAL)
this.messagesPerExecutionInterval = Math.floorDiv(messageRate, maxInjectorsLoad);
this.executionTaskInterval = MINIMAL_INTERVAL;
}
}
return this;
}

private Builder parseArgs() {
Expand Down Expand Up @@ -288,6 +392,12 @@ private Builder parseArgs() {
case "consoleReporterEnabled":
consoleReporterEnabled(Boolean.parseBoolean(value));
break;
case "injectors":
injectors(Integer.parseInt(value));
break;
case "messageRate":
messageRate(Integer.parseInt(value));
break;
default:
addOption(key, value);
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,11 @@
import org.reactivestreams.Subscriber;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiFunction;
Expand All @@ -37,7 +34,6 @@
import java.util.stream.IntStream;
import java.util.stream.LongStream;


/**
* BenchmarksState is the state of the benchmark. it gives you the analogy of the beginning, and ending of the test. It
* can run both sync or async way using the {@link #runForSync(Function)} and {@link #runForAsync(Function)}
Expand All @@ -57,8 +53,6 @@ public class BenchmarksState<SELF extends BenchmarksState<SELF>> {

private static final Logger LOGGER = LoggerFactory.getLogger(BenchmarksState.class);

private static final String MDC_BENCHMARK_DIR = "benchmarkDir";

protected final BenchmarksSettings settings;

private Scheduler scheduler;
Expand Down Expand Up @@ -89,8 +83,6 @@ public final void start() {
throw new IllegalStateException("BenchmarksState is already started");
}

MDC.put(MDC_BENCHMARK_DIR, settings.csvReporterDirectory().toString());

LOGGER.info("Benchmarks settings: " + settings);

if (settings.consoleReporterEnabled()) {
Expand All @@ -107,11 +99,11 @@ public final void start() {
.build(settings.csvReporterDirectory());

scheduler = Schedulers.fromExecutor(
Executors.newFixedThreadPool(settings.nThreads(), newThreadFactory()));
Executors.newFixedThreadPool(settings.nThreads()));

schedulers = IntStream.rangeClosed(1, settings.nThreads())
.mapToObj(i -> Schedulers.fromExecutorService(
Executors.newSingleThreadScheduledExecutor(newThreadFactory())))
Executors.newSingleThreadScheduledExecutor()))
.collect(Collectors.toList());

try {
Expand Down Expand Up @@ -170,8 +162,6 @@ public final void shutdown() {
afterAll();
} catch (Exception ex) {
throw new IllegalStateException("BenchmarksState afterAll() failed: " + ex, ex);
} finally {
MDC.remove(MDC_BENCHMARK_DIR);
}
}

Expand Down Expand Up @@ -329,48 +319,31 @@ public final <T> void runWithRampUp(
int schedulerIndex = (int) ((rampUpIteration & Long.MAX_VALUE) % schedulers().size());
Scheduler scheduler = schedulers().get(schedulerIndex);

// create tasks on selected scheduler
Flux<T> setUpFactory = Flux.create((FluxSink<T> sink) -> {
Flux<T> deferSetUp = Flux.defer(() -> setUp.apply(rampUpIteration, self));
deferSetUp.subscribe(
sink::next,
ex -> {
LOGGER.error("Exception occured on setUp at rampUpIteration: {}, "
+ "cause: {}, task won't start", rampUpIteration, ex);
sink.complete();
},
sink::complete);
});
return setUpFactory
.subscribeOn(scheduler)
.map(setUpResult -> new BenchmarksTask<>(self, setUpResult, unitOfWork, cleanUp, scheduler))
.doOnNext(scheduler::schedule)
.flatMap(BenchmarksTask::completionMono);

return Flux
.range(0, Math.max(1, settings.injectorsPerRampUpInterval()))
.flatMap(iteration1 -> {
// create tasks on selected scheduler
Flux<T> setUpFactory = Flux.create((FluxSink<T> sink) -> {
Flux<T> deferSetUp = Flux.defer(() -> setUp.apply(rampUpIteration, self));
deferSetUp.subscribe(
sink::next,
ex -> {
LOGGER.error("Exception occured on setUp at rampUpIteration: {}, "
+ "cause: {}, task won't start", rampUpIteration, ex);
sink.complete();
},
sink::complete);
});
return setUpFactory
.subscribeOn(scheduler)
.map(setUpResult -> new BenchmarksTask<>(self, setUpResult, unitOfWork, cleanUp, scheduler))
.doOnNext(scheduler::schedule)
.flatMap(BenchmarksTask::completionMono);
});
}, Integer.MAX_VALUE, Integer.MAX_VALUE)
.blockLast();
} finally {
self.shutdown();
}
}

private ThreadFactory newThreadFactory() {
return runnable -> {
Map<String, String> mdcCopy = MDC.getCopyOfContextMap();

return new Thread(runnable) {

private boolean mdcWasSet;

@Override
public void run() {
if (!mdcWasSet) {
MDC.setContextMap(mdcCopy);
mdcWasSet = true;
}
super.run();
}
};
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,9 @@ public void run() {
if (isScheduled()) { // executing
long iter = iterationsCounter.incrementAndGet();

Flux.from(unitOfWork.apply(iter, setUpResult))
Flux
.range(0, Math.max(1, benchmarksState.settings.messagesPerExecutionInterval()))
.flatMap(iteration1 -> Flux.from(unitOfWork.apply(iter, setUpResult)))
.doOnError(ex -> LOGGER.warn("Exception occured on unitOfWork at iteration: {}, cause: {}", iter, ex))
.subscribe();

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package io.scalecube.benchmarks;

import static org.junit.jupiter.api.Assertions.assertEquals;

import org.junit.jupiter.api.Test;

import java.time.Duration;

class BenchmarksSettingsTest {
private static final String[] EMPTY_ARGS = new String[] {};

@Test
void testOneUserAndSevenMsgRate() {
BenchmarksSettings settings = BenchmarksSettings.from(EMPTY_ARGS)
.injectors(1)
.messageRate(7)
.executionTaskDuration(Duration.ofSeconds(10))
.executionTaskInterval(Duration.ofSeconds(3))
.build();

assertEquals(Duration.ofSeconds(10), settings.rampUpDuration());
assertEquals(Duration.ofSeconds(10), settings.rampUpInterval());
assertEquals(Duration.ofMillis(100), settings.executionTaskInterval());
assertEquals(Duration.ofSeconds(10), settings.executionTaskDuration());
assertEquals(1, settings.injectorsPerRampUpInterval());
assertEquals(1, settings.messagesPerExecutionInterval());
}
}
13 changes: 3 additions & 10 deletions benchmarks-examples/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,9 @@
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<groupId>io.scalecube</groupId>
<artifactId>scalecube-benchmarks-log4j2</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>

Expand Down
Loading