Skip to content
This repository was archived by the owner on Jun 20, 2025. It is now read-only.

Feature/give control when to schedule to the scenario #89

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.LongStream;
Expand Down Expand Up @@ -306,7 +307,7 @@ public final void runForAsync(Function<S, Function<Long, Publisher<?>>> func) {
*/
public final <T> void runWithRampUp(
BiFunction<Long, S, Publisher<T>> setUp,
Function<S, BiFunction<Long, T, Publisher<?>>> func,
Function<S, Function<T, BiFunction<Long, BenchmarkTask, Publisher<?>>>> func,
BiFunction<S, T, Mono<Void>> cleanUp) {

// noinspection unchecked
Expand All @@ -315,28 +316,31 @@ public final <T> void runWithRampUp(
try {
self.start();

BiFunction<Long, T, Publisher<?>> unitOfWork = func.apply(self);
Function<T, BiFunction<Long, BenchmarkTask, Publisher<?>>> unitOfWork = func.apply(self);

Flux.interval(Duration.ZERO, settings.rampUpInterval())
.take(settings.rampUpDuration())
.flatMap(
rampUpIteration -> {
// select scheduler and bind tasks to it
int schedulerIndex =
(int) ((rampUpIteration & Long.MAX_VALUE) % schedulers().size());
Scheduler scheduler = schedulers().get(schedulerIndex);

Scheduler scheduler = selectScheduler(rampUpIteration);
return Flux.range(0, Math.max(1, settings.injectorsPerRampUpInterval()))
.flatMap(
iteration1 ->
createSetUpFactory(setUp, self, rampUpIteration)
.subscribeOn(scheduler)
.map(
setUpResult ->
new BenchmarkTask<>(
self, setUpResult, unitOfWork, cleanUp, scheduler))
setUpResult -> {
BiFunction<Long, BenchmarkTask, Publisher<?>> unitOfWork1 =
unitOfWork.apply(setUpResult);

Supplier<Mono<Void>> cleanUp1 =
() -> cleanUp.apply(self, setUpResult);

return new BenchmarkTaskImpl(
self.settings, scheduler, unitOfWork1, cleanUp1);
})
.doOnNext(scheduler::schedule)
.flatMap(BenchmarkTask::completionMono));
.flatMap(BenchmarkTaskImpl::completionMono));
},
Integer.MAX_VALUE,
Integer.MAX_VALUE)
Expand All @@ -346,6 +350,12 @@ public final <T> void runWithRampUp(
}
}

private Scheduler selectScheduler(Long rampUpIteration) {
// select scheduler and bind tasks to it
int schedulerIndex = (int) ((rampUpIteration & Long.MAX_VALUE) % schedulers().size());
return schedulers().get(schedulerIndex);
}

private <T> Flux<T> createSetUpFactory(
BiFunction<Long, S, Publisher<T>> setUp, S self, Long rampUpIteration) {
// create tasks on selected scheduler
Expand Down
Original file line number Diff line number Diff line change
@@ -1,183 +1,14 @@
package io.scalecube.benchmarks;

import static io.scalecube.benchmarks.BenchmarkTask.Status.COMPLETED;
import static io.scalecube.benchmarks.BenchmarkTask.Status.COMPLETING;
import static io.scalecube.benchmarks.BenchmarkTask.Status.SCHEDULED;

import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;

public class BenchmarkTask<S extends BenchmarkState<S>, T> implements Runnable {

private static final Logger LOGGER = LoggerFactory.getLogger(BenchmarkTask.class);

public enum Status {
SCHEDULED,
COMPLETING,
COMPLETED
}

private final S benchmarksState;
private final T setUpResult;
private final BiFunction<Long, T, Publisher<?>> unitOfWork;
private final BiFunction<S, T, Mono<Void>> cleanUp;
private final long numOfIterations;
private final Duration executionTaskDuration;
private final Duration executionTaskInterval;
private final Scheduler scheduler;

private final AtomicLong iterationsCounter = new AtomicLong();
private final AtomicReference<Status> taskStatus = new AtomicReference<>();
private final CompletableFuture<Void> taskCompletionFuture = new CompletableFuture<>();
private final AtomicReference<Disposable> scheduledCompletingTask = new AtomicReference<>();

/**
* Constructs benchmark task.
*
* @param setUpResult a result of setUp function.
* @param unitOfWork an unit of work.
* @param cleanUp a function that should clean up some T's resources.
* @param scheduler a scheduler.
*/
public BenchmarkTask(
S benchmarksState,
T setUpResult,
BiFunction<Long, T, Publisher<?>> unitOfWork,
BiFunction<S, T, Mono<Void>> cleanUp,
Scheduler scheduler) {

this.benchmarksState = benchmarksState;
this.setUpResult = setUpResult;
this.unitOfWork = unitOfWork;
this.cleanUp = cleanUp;
this.scheduler = scheduler;
this.numOfIterations = benchmarksState.settings.numOfIterations();
this.executionTaskDuration = benchmarksState.settings.executionTaskDuration();
this.executionTaskInterval = benchmarksState.settings.executionTaskInterval();
}

@Override
public void run() {
if (isCompleted()) {
return; // this is the end
}

if (iterationsCounter.get() >= numOfIterations) {
LOGGER.debug("Task is completed due to iterations limit: " + numOfIterations);
startCompleting();
return; // this is the end
}

if (isScheduled()) { // executing
long iter = iterationsCounter.incrementAndGet();

//noinspection unchecked
Flux<T> defer = (Flux<T>) Flux.defer(() -> unitOfWork.apply(iter, setUpResult));

Flux.from(defer)
.doOnError(
ex ->
LOGGER.warn(
"Exception occurred on unitOfWork at iteration: {}, cause: {}", iter, ex))
.repeat(Math.max(1, benchmarksState.settings.messagesPerExecutionInterval()))
.doFinally(
signalType -> {
if (executionTaskInterval.isZero()) {
scheduler.schedule(this);
} else {
scheduler.schedule(this, executionTaskInterval.toMillis(), TimeUnit.MILLISECONDS);
}
})
.subscribe();
return;
}

if (setScheduled()) { // scheduled
scheduledCompletingTask.set(
scheduler.schedule(
() -> {
LOGGER.debug(
"Task is completing due to execution duration limit: "
+ executionTaskDuration.toMillis());
startCompleting();
},
executionTaskDuration.toMillis(),
TimeUnit.MILLISECONDS));

LOGGER.debug("Obtained setUp result: {}, now scheduling", setUpResult);
scheduler.schedule(this);
}
}

public Mono<Void> completionMono() {
return Mono.fromFuture(taskCompletionFuture);
}

private boolean setCompleted() {
final boolean compareAndSet = taskStatus.compareAndSet(COMPLETING, COMPLETED);
Disposable disposable = scheduledCompletingTask.get();
if (disposable != null) {
disposable.dispose();
}
LOGGER.debug("Task is completed");
taskCompletionFuture.obtrudeValue(null);
return compareAndSet;
}

private boolean setCompletedWithError(Throwable throwable) {
final boolean compareAndSet = taskStatus.compareAndSet(COMPLETING, COMPLETED);
Disposable disposable = scheduledCompletingTask.get();
if (disposable != null) {
disposable.dispose();
}
LOGGER.error("Task is completed with error: {}", throwable);
taskCompletionFuture.obtrudeException(throwable);
return compareAndSet;
}

private boolean setScheduled() {
return taskStatus.compareAndSet(null, SCHEDULED);
}
public interface BenchmarkTask {

private boolean trySetCompleting() {
return taskStatus.compareAndSet(null, COMPLETING)
|| taskStatus.compareAndSet(SCHEDULED, COMPLETING);
}
BenchmarkSettings settings();

private boolean isCompleted() {
return taskStatus.get() == COMPLETED;
}
void schedule(Duration interval);

private boolean isScheduled() {
return taskStatus.get() == SCHEDULED;
}
void scheduleWithInterval();

private void startCompleting() {
if (trySetCompleting()) {
try {
Mono<Void> voidMono = cleanUp.apply(benchmarksState, setUpResult);
voidMono.subscribe(
empty -> setCompleted(),
ex -> {
LOGGER.error("Exception occured on cleanUp, cause: {}", ex);
setCompletedWithError(ex);
},
this::setCompleted);
} catch (Throwable ex) {
LOGGER.error("Exception occured on cleanUp, cause: {}", ex);
setCompletedWithError(ex);
}
}
}
void scheduleNow();
}
Loading