Skip to content

Commit 4621e51

Browse files
Philipp Fehresideshowcoder
authored andcommitted
Move event emitting off the main thread to avoid deadlocks
When stacking event emitting inside an EventProvider, when using sychronization the EventProvider can deadlock, to avoid this move the event emitting of the main thread. Signed-off-by: Philipp Fehre <philipp@fehre.co.uk>
1 parent b797883 commit 4621e51

File tree

5 files changed

+155
-6
lines changed

5 files changed

+155
-6
lines changed

src/main/java/dev/openfeature/sdk/EventProvider.java

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
11
package dev.openfeature.sdk;
22

33
import dev.openfeature.sdk.internal.TriConsumer;
4+
import java.util.concurrent.ExecutorService;
5+
import java.util.concurrent.Executors;
6+
import java.util.concurrent.TimeUnit;
7+
import lombok.extern.slf4j.Slf4j;
48

59
/**
610
* Abstract EventProvider. Providers must extend this class to support events.
@@ -14,8 +18,10 @@
1418
*
1519
* @see FeatureProvider
1620
*/
21+
@Slf4j
1722
public abstract class EventProvider implements FeatureProvider {
1823
private EventProviderListener eventProviderListener;
24+
private final ExecutorService emitterExecutor = Executors.newCachedThreadPool();
1925

2026
void setEventProviderListener(EventProviderListener eventProviderListener) {
2127
this.eventProviderListener = eventProviderListener;
@@ -46,6 +52,24 @@ void detach() {
4652
this.onEmit = null;
4753
}
4854

55+
/**
56+
* Stop the event emitter executor and block until either termination has completed
57+
* or timeout period has elapsed.
58+
*/
59+
@Override
60+
public void shutdown() {
61+
emitterExecutor.shutdown();
62+
try {
63+
if (!emitterExecutor.awaitTermination(EventSupport.SHUTDOWN_TIMEOUT_SECONDS, TimeUnit.SECONDS)) {
64+
log.warn("Emitter executor did not terminate before the timeout period had elapsed");
65+
emitterExecutor.shutdownNow();
66+
}
67+
} catch (InterruptedException e) {
68+
emitterExecutor.shutdownNow();
69+
Thread.currentThread().interrupt();
70+
}
71+
}
72+
4973
/**
5074
* Emit the specified {@link ProviderEvent}.
5175
*
@@ -56,8 +80,10 @@ public void emit(ProviderEvent event, ProviderEventDetails details) {
5680
if (eventProviderListener != null) {
5781
eventProviderListener.onEmit(event, details);
5882
}
59-
if (this.onEmit != null) {
60-
this.onEmit.accept(this, event, details);
83+
84+
final TriConsumer<EventProvider, ProviderEvent, ProviderEventDetails> localOnEmit = this.onEmit;
85+
if (localOnEmit != null) {
86+
emitterExecutor.submit(() -> localOnEmit.accept(this, event, details));
6187
}
6288
}
6389

src/main/java/dev/openfeature/sdk/EventSupport.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,15 @@
1919
@Slf4j
2020
class EventSupport {
2121

22+
public static final int SHUTDOWN_TIMEOUT_SECONDS = 3;
23+
2224
// we use a v4 uuid as a "placeholder" for anonymous clients, since
2325
// ConcurrentHashMap doesn't support nulls
2426
private static final String defaultClientUuid = UUID.randomUUID().toString();
25-
private static final int SHUTDOWN_TIMEOUT_SECONDS = 3;
2627
private final Map<String, HandlerStore> handlerStores = new ConcurrentHashMap<>();
2728
private final HandlerStore globalHandlerStore = new HandlerStore();
2829
private final ExecutorService taskExecutor = Executors.newCachedThreadPool(runnable -> {
2930
final Thread thread = new Thread(runnable);
30-
thread.setDaemon(true);
3131
return thread;
3232
});
3333

src/test/java/dev/openfeature/sdk/EventProviderTest.java

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,19 @@
22

33
import static org.junit.jupiter.api.Assertions.assertThrows;
44
import static org.mockito.ArgumentMatchers.any;
5-
import static org.mockito.Mockito.*;
5+
import static org.mockito.Mockito.mock;
6+
import static org.mockito.Mockito.never;
7+
import static org.mockito.Mockito.times;
8+
import static org.mockito.Mockito.verify;
69

710
import dev.openfeature.sdk.internal.TriConsumer;
11+
import dev.openfeature.sdk.testutils.TestStackedEmitCallsProvider;
12+
import io.cucumber.java.AfterAll;
813
import lombok.SneakyThrows;
914
import org.junit.jupiter.api.BeforeEach;
1015
import org.junit.jupiter.api.DisplayName;
1116
import org.junit.jupiter.api.Test;
17+
import org.junit.jupiter.api.Timeout;
1218

1319
class EventProviderTest {
1420

@@ -21,6 +27,11 @@ void setup() {
2127
eventProvider.initialize(null);
2228
}
2329

30+
@AfterAll
31+
public static void resetDefaultProvider() {
32+
OpenFeatureAPI.getInstance().setProviderAndWait(new NoOpProvider());
33+
}
34+
2435
@Test
2536
@DisplayName("should run attached onEmit with emitters")
2637
void emitsEventsWhenAttached() {
@@ -75,6 +86,15 @@ void doesNotThrowWhenOnEmitSame() {
7586
eventProvider.attach(onEmit2); // should not throw, same instance. noop
7687
}
7788

89+
@Test
90+
@SneakyThrows
91+
@Timeout(value = 2, threadMode = Timeout.ThreadMode.SEPARATE_THREAD)
92+
@DisplayName("should not deadlock on emit called during emit")
93+
void doesNotDeadlockOnEmitStackedCalls() {
94+
TestStackedEmitCallsProvider provider = new TestStackedEmitCallsProvider();
95+
OpenFeatureAPI.getInstance().setProviderAndWait(provider);
96+
}
97+
7898
static class TestEventProvider extends EventProvider {
7999

80100
private static final String NAME = "TestEventProvider";

src/test/java/dev/openfeature/sdk/EventsTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919

2020
class EventsTest {
2121

22-
private static final int TIMEOUT = 300;
22+
private static final int TIMEOUT = 500;
2323
private static final int INIT_DELAY = TIMEOUT / 2;
2424

2525
@AfterAll
Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
package dev.openfeature.sdk.testutils;
2+
3+
import dev.openfeature.sdk.EvaluationContext;
4+
import dev.openfeature.sdk.EventProvider;
5+
import dev.openfeature.sdk.Metadata;
6+
import dev.openfeature.sdk.ProviderEvaluation;
7+
import dev.openfeature.sdk.ProviderEvent;
8+
import dev.openfeature.sdk.ProviderEventDetails;
9+
import dev.openfeature.sdk.Value;
10+
import java.util.function.Consumer;
11+
12+
public class TestStackedEmitCallsProvider extends EventProvider {
13+
private final NestedBlockingEmitter nestedBlockingEmitter = new NestedBlockingEmitter(this::onProviderEvent);
14+
15+
@Override
16+
public Metadata getMetadata() {
17+
return () -> getClass().getSimpleName();
18+
}
19+
20+
@Override
21+
public void initialize(EvaluationContext evaluationContext) throws Exception {
22+
synchronized (nestedBlockingEmitter) {
23+
nestedBlockingEmitter.init();
24+
while (!nestedBlockingEmitter.isReady()) {
25+
try {
26+
nestedBlockingEmitter.wait();
27+
} catch (InterruptedException e) {
28+
}
29+
}
30+
}
31+
}
32+
33+
private void onProviderEvent(ProviderEvent providerEvent) {
34+
synchronized (nestedBlockingEmitter) {
35+
if (providerEvent == ProviderEvent.PROVIDER_READY) {
36+
nestedBlockingEmitter.setReady();
37+
/*
38+
* This line deadlocked in the original implementation without the emitterExecutor see
39+
* https://github.com/open-feature/java-sdk/issues/1299
40+
*/
41+
emitProviderReady(ProviderEventDetails.builder().build());
42+
}
43+
}
44+
}
45+
46+
@Override
47+
public ProviderEvaluation<Boolean> getBooleanEvaluation(String key, Boolean defaultValue, EvaluationContext ctx) {
48+
throw new UnsupportedOperationException("Unimplemented method 'getBooleanEvaluation'");
49+
}
50+
51+
@Override
52+
public ProviderEvaluation<String> getStringEvaluation(String key, String defaultValue, EvaluationContext ctx) {
53+
throw new UnsupportedOperationException("Unimplemented method 'getStringEvaluation'");
54+
}
55+
56+
@Override
57+
public ProviderEvaluation<Integer> getIntegerEvaluation(String key, Integer defaultValue, EvaluationContext ctx) {
58+
throw new UnsupportedOperationException("Unimplemented method 'getIntegerEvaluation'");
59+
}
60+
61+
@Override
62+
public ProviderEvaluation<Double> getDoubleEvaluation(String key, Double defaultValue, EvaluationContext ctx) {
63+
throw new UnsupportedOperationException("Unimplemented method 'getDoubleEvaluation'");
64+
}
65+
66+
@Override
67+
public ProviderEvaluation<Value> getObjectEvaluation(String key, Value defaultValue, EvaluationContext ctx) {
68+
throw new UnsupportedOperationException("Unimplemented method 'getObjectEvaluation'");
69+
}
70+
71+
static class NestedBlockingEmitter {
72+
73+
private final Consumer<ProviderEvent> emitProviderEvent;
74+
private volatile boolean isReady;
75+
76+
public NestedBlockingEmitter(Consumer<ProviderEvent> emitProviderEvent) {
77+
this.emitProviderEvent = emitProviderEvent;
78+
}
79+
80+
public void init() {
81+
// run init outside monitored thread
82+
new Thread(() -> {
83+
try {
84+
Thread.sleep(500);
85+
} catch (InterruptedException e) {
86+
throw new RuntimeException(e);
87+
}
88+
89+
emitProviderEvent.accept(ProviderEvent.PROVIDER_READY);
90+
})
91+
.start();
92+
}
93+
94+
public boolean isReady() {
95+
return isReady;
96+
}
97+
98+
public synchronized void setReady() {
99+
isReady = true;
100+
this.notifyAll();
101+
}
102+
}
103+
}

0 commit comments

Comments
 (0)