Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implement MicroProfile Fault Tolerance 3.0 #264

Merged
merged 3 commits into from
Aug 6, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ public <T> T get(Class<T> clazz) {
}

// out-of-band communication between fault tolerance strategies in a single chain
// (only makes sense if different strategies in the chain run on different threads)

private final ConcurrentMap<Class<? extends InvocationContextEvent>, Collection<Consumer<? extends InvocationContextEvent>>> eventHandlers = new ConcurrentHashMap<>();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,6 @@ public class CancellationEvent implements InvocationContextEvent {
public static final CancellationEvent INSTANCE = new CancellationEvent();

private CancellationEvent() {
} // avoid instantiation
// avoid instantiation
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,55 +4,16 @@

import io.smallrye.faulttolerance.core.FaultToleranceStrategy;

/**
* @author Michal Szynkiewicz, michal.l.szynkiewicz@gmail.com
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:(

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tend to do this when I do larger modifications to the classes, such that I'd have to add another author tag for myself. I find them an unnecessary clutter that convey exactly zero information. Your authorship is carefully preserved in Git history :-)

*/
public abstract class BulkheadBase<V> implements FaultToleranceStrategy<V> {
private final String description;
final FaultToleranceStrategy<V> delegate;
final MetricsRecorder recorder;

BulkheadBase(String description, FaultToleranceStrategy<V> delegate, MetricsRecorder recorder) {
BulkheadBase(String description, FaultToleranceStrategy<V> delegate) {
this.description = description;
this.delegate = delegate;
this.recorder = recorder == null ? MetricsRecorder.NOOP : recorder;
}

BulkheadException bulkheadRejected() {
return new BulkheadException(description + " rejected from bulkhead");
}

public interface MetricsRecorder {
void bulkheadQueueEntered();

void bulkheadQueueLeft(long timeInQueue);

void bulkheadEntered();

void bulkheadRejected();

void bulkheadLeft(long processingTime);

MetricsRecorder NOOP = new MetricsRecorder() {
@Override
public void bulkheadQueueEntered() {
}

@Override
public void bulkheadQueueLeft(long timeInQueue) {
}

@Override
public void bulkheadEntered() {
}

@Override
public void bulkheadRejected() {
}

@Override
public void bulkheadLeft(long processingTime) {
}
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package io.smallrye.faulttolerance.core.bulkhead;

import io.smallrye.faulttolerance.core.InvocationContextEvent;

public class BulkheadEvents {
public enum DecisionMade implements InvocationContextEvent {
ACCEPTED(true),
REJECTED(false),
;

public final boolean accepted;

DecisionMade(boolean accepted) {
this.accepted = accepted;
}
}

public enum StartedWaiting implements InvocationContextEvent {
INSTANCE
}

public enum FinishedWaiting implements InvocationContextEvent {
INSTANCE
}

public enum StartedRunning implements InvocationContextEvent {
INSTANCE
}

public enum FinishedRunning implements InvocationContextEvent {
INSTANCE
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,29 +7,18 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Semaphore;

import org.jboss.logging.Logger;

import io.smallrye.faulttolerance.core.FaultToleranceStrategy;
import io.smallrye.faulttolerance.core.InvocationContext;

/**
* @author Michal Szynkiewicz, michal.l.szynkiewicz@gmail.com
*/
public class CompletionStageBulkhead<V> extends BulkheadBase<CompletionStage<V>> {
private static final Logger logger = Logger.getLogger(CompletionStageBulkhead.class);

private final ExecutorService executor;
private final int queueSize;
private final Semaphore workSemaphore;
private final Semaphore capacitySemaphore;

public CompletionStageBulkhead(
FaultToleranceStrategy<CompletionStage<V>> delegate,
String description,
ExecutorService executor,
int size, int queueSize,
MetricsRecorder recorder) {
super(description, delegate, recorder);
public CompletionStageBulkhead(FaultToleranceStrategy<CompletionStage<V>> delegate, String description,
ExecutorService executor, int size, int queueSize) {
super(description, delegate);
workSemaphore = new Semaphore(size);
capacitySemaphore = new Semaphore(size + queueSize);
this.queueSize = queueSize;
Expand All @@ -41,12 +30,13 @@ public CompletionStage<V> apply(InvocationContext<CompletionStage<V>> ctx) {
// TODO we shouldn't put tasks into the executor if they immediately block on workSemaphore,
// they should be put into some queue
if (capacitySemaphore.tryAcquire()) {
CompletionStageBulkheadTask task = new CompletionStageBulkheadTask(System.nanoTime(), ctx);
ctx.fireEvent(BulkheadEvents.DecisionMade.ACCEPTED);
ctx.fireEvent(BulkheadEvents.StartedWaiting.INSTANCE);
CompletionStageBulkheadTask task = new CompletionStageBulkheadTask(ctx);
executor.execute(task);
recorder.bulkheadQueueEntered();
return task.result;
} else {
recorder.bulkheadRejected();
ctx.fireEvent(BulkheadEvents.DecisionMade.REJECTED);
return failedStage(bulkheadRejected());
}
}
Expand All @@ -57,34 +47,32 @@ int getQueueSize() {
}

private class CompletionStageBulkheadTask implements Runnable {
private final long timeEnqueued;
private final CompletableFuture<V> result = new CompletableFuture<>();
private final InvocationContext<CompletionStage<V>> context;
private final InvocationContext<CompletionStage<V>> ctx;

private CompletionStageBulkheadTask(long timeEnqueued,
InvocationContext<CompletionStage<V>> context) {
this.timeEnqueued = timeEnqueued;
this.context = context;
private CompletionStageBulkheadTask(InvocationContext<CompletionStage<V>> ctx) {
this.ctx = ctx;
}

public void run() {
try {
workSemaphore.acquire();
} catch (InterruptedException e) {
// among other occasions, this also happens during shutdown
capacitySemaphore.release();
ctx.fireEvent(BulkheadEvents.FinishedWaiting.INSTANCE);
result.completeExceptionally(e);
return;
}

CompletionStage<V> rawResult;
long startTime = System.nanoTime();
recorder.bulkheadQueueLeft(startTime - timeEnqueued);
recorder.bulkheadEntered();
ctx.fireEvent(BulkheadEvents.FinishedWaiting.INSTANCE);
ctx.fireEvent(BulkheadEvents.StartedRunning.INSTANCE);
try {
rawResult = delegate.apply(context);
rawResult = delegate.apply(ctx);
rawResult.whenComplete((value, error) -> {
releaseSemaphores();
recorder.bulkheadLeft(System.nanoTime() - startTime);
ctx.fireEvent(BulkheadEvents.FinishedRunning.INSTANCE);
if (error != null) {
result.completeExceptionally(error);
} else {
Expand All @@ -93,7 +81,7 @@ public void run() {
});
} catch (Exception e) {
releaseSemaphores();
recorder.bulkheadLeft(System.nanoTime() - startTime);
ctx.fireEvent(BulkheadEvents.FinishedRunning.INSTANCE);
result.completeExceptionally(e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,25 +8,24 @@
public class SemaphoreBulkhead<V> extends BulkheadBase<V> {
private final Semaphore semaphore;

public SemaphoreBulkhead(FaultToleranceStrategy<V> delegate, String description, int size,
MetricsRecorder metricsRecorder) {
super(description, delegate, metricsRecorder);
public SemaphoreBulkhead(FaultToleranceStrategy<V> delegate, String description, int size) {
super(description, delegate);
semaphore = new Semaphore(size);
}

@Override
public V apply(InvocationContext<V> ctx) throws Exception {
if (semaphore.tryAcquire()) {
recorder.bulkheadEntered();
long startTime = System.nanoTime();
ctx.fireEvent(BulkheadEvents.DecisionMade.ACCEPTED);
ctx.fireEvent(BulkheadEvents.StartedRunning.INSTANCE);
try {
return delegate.apply(ctx);
} finally {
semaphore.release();
recorder.bulkheadLeft(System.nanoTime() - startTime);
ctx.fireEvent(BulkheadEvents.FinishedRunning.INSTANCE);
}
} else {
recorder.bulkheadRejected();
ctx.fireEvent(BulkheadEvents.DecisionMade.REJECTED);
throw bulkheadRejected();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,35 +29,30 @@ public class ThreadPoolBulkhead<V> extends BulkheadBase<Future<V>> {
private final Semaphore capacitySemaphore;
private final int queueSize;

public ThreadPoolBulkhead(
FaultToleranceStrategy<Future<V>> delegate,
String description,
ExecutorService executor,
int size, int queueSize,
MetricsRecorder recorder) {
super(description, delegate, recorder);
public ThreadPoolBulkhead(FaultToleranceStrategy<Future<V>> delegate, String description, ExecutorService executor,
int size, int queueSize) {
super(description, delegate);
capacitySemaphore = new Semaphore(size + queueSize);
this.queueSize = queueSize;
this.executor = executor;
}

@Override
public Future<V> apply(InvocationContext<Future<V>> ctx) throws Exception {
long timeEnqueued = System.nanoTime();
if (capacitySemaphore.tryAcquire()) {
ctx.fireEvent(BulkheadEvents.DecisionMade.ACCEPTED);
BulkheadTask task = new BulkheadTask("ThreadPoolBulkhead", () -> {
long startTime = System.nanoTime();
recorder.bulkheadQueueLeft(startTime - timeEnqueued);
recorder.bulkheadEntered();
ctx.fireEvent(BulkheadEvents.FinishedWaiting.INSTANCE);
ctx.fireEvent(BulkheadEvents.StartedRunning.INSTANCE);
try {
return delegate.apply(ctx);
} finally {
recorder.bulkheadLeft(System.nanoTime() - startTime);
ctx.fireEvent(BulkheadEvents.FinishedRunning.INSTANCE);
}
});
ctx.registerEventHandler(CancellationEvent.class, ignored -> task.cancel());
ctx.fireEvent(BulkheadEvents.StartedWaiting.INSTANCE);
executor.execute(task);
recorder.bulkheadQueueEntered();

try {
return task.get();
Expand All @@ -68,7 +63,7 @@ public Future<V> apply(InvocationContext<Future<V>> ctx) throws Exception {
throw sneakyThrow(e.getCause());
}
} else {
recorder.bulkheadRejected();
ctx.fireEvent(BulkheadEvents.DecisionMade.REJECTED);
throw bulkheadRejected();
}
}
Expand All @@ -79,9 +74,11 @@ int getQueueSize() {
}

private class BulkheadTask extends NamedFutureTask<Future<V>> {
private static final int WAITING = 0, RUNNING = 1, CANCELING = 2;
private static final int WAITING = 0;
private static final int RUNNING = 1;
private static final int CANCELLED = 2;

private AtomicInteger state = new AtomicInteger(WAITING);
private final AtomicInteger state = new AtomicInteger(WAITING);

public BulkheadTask(String name, Callable<Future<V>> callable) {
super(name, callable);
Expand All @@ -99,7 +96,7 @@ public void run() {
}

public void cancel() {
if (state.compareAndSet(WAITING, CANCELING)) {
if (state.compareAndSet(WAITING, CANCELLED)) {
capacitySemaphore.release();
}
}
Expand Down
Loading