Skip to content

Use ThreadedActionListener in ListenableFuture #94367

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@
import org.elasticsearch.action.support.ListenableActionFuture;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.ThreadedActionListener;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.core.Nullable;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;

/**
Expand All @@ -43,100 +43,72 @@
public final class ListenableFuture<V> extends PlainActionFuture<V> {

private volatile boolean done = false;
private List<Tuple<ActionListener<V>, ExecutorService>> listeners;
private List<ActionListener<V>> listeners;

/**
* Adds a listener to this future. If the future has not yet completed, the listener will be
* notified of a response or exception on the thread completing this future.
* If the future has completed, the listener will be notified immediately without forking to
* a different thread.
* Adds a listener to this future. If the future has not yet completed, the listener will be notified of a response or exception on the
* thread completing this future. If the future has completed, the listener will be notified immediately without forking to a different
* thread.
*/
public void addListener(ActionListener<V> listener) {
addListener(listener, EsExecutors.DIRECT_EXECUTOR_SERVICE, null);
}

/**
* Adds a listener to this future. If the future has not yet completed, the listener will be
* notified of a response or exception in a runnable submitted to the ExecutorService provided.
* If the future has completed, the listener will be notified immediately without forking to
* a different thread.
* Adds a listener to this future. If the future has not yet completed, the listener will be notified of a response or exception in a
* runnable submitted to the {@link Executor} provided. If the future has completed, the listener will be notified immediately without
* forking to a different thread.
*
* It will apply the provided ThreadContext (if not null) when executing the listening.
* It will restore the provided {@link ThreadContext} (if not null) when completing the listener.
*/
public void addListener(ActionListener<V> listener, ExecutorService executor, ThreadContext threadContext) {
if (done) {
public void addListener(ActionListener<V> listener, Executor executor, @Nullable ThreadContext threadContext) {
if (done || addListenerIfIncomplete(listener, executor, threadContext) == false) {
// run the callback directly, we don't hold the lock and don't need to fork!
notifyListenerDirectly(listener);
} else {
final boolean run;
// check done under lock since it could have been modified and protect modifications
// to the list under lock
synchronized (this) {
if (done) {
run = true;
} else {
final ActionListener<V> wrappedListener;
if (threadContext == null) {
wrappedListener = listener;
} else {
wrappedListener = ContextPreservingActionListener.wrapPreservingContext(listener, threadContext);
}
if (listeners == null) {
listeners = new ArrayList<>();
}
listeners.add(new Tuple<>(wrappedListener, executor));
run = false;
}
}

if (run) {
// run the callback directly, we don't hold the lock and don't need to fork!
notifyListenerDirectly(listener);
}
notifyListener(listener);
}
}

@Override
protected void done(boolean ignored) {
final List<Tuple<ActionListener<V>, ExecutorService>> existingListeners;
synchronized (this) {
done = true;
existingListeners = listeners;
if (existingListeners == null) {
return;
}
listeners = null;
}
for (Tuple<ActionListener<V>, ExecutorService> t : existingListeners) {
final ExecutorService executorService = t.v2();
final ActionListener<V> listener = t.v1();
if (executorService == EsExecutors.DIRECT_EXECUTOR_SERVICE) {
notifyListenerDirectly(listener);
} else {
notifyListener(listener, executorService);
final var existingListeners = acquireExistingListeners();
if (existingListeners != null) {
for (final var listener : existingListeners) {
notifyListener(listener);
}
}
}

private void notifyListenerDirectly(ActionListener<V> listener) {
// call get in a non-blocking fashion as we could be on a network thread
// or another thread like the scheduler, which we should never block!
assert done;
ActionListener.completeWith(listener, () -> FutureUtils.get(ListenableFuture.this, 0L, TimeUnit.NANOSECONDS));
private synchronized boolean addListenerIfIncomplete(ActionListener<V> listener, Executor executor, ThreadContext threadContext) {
// check done under lock since it could have been modified; also protect modifications to the list under lock
if (done) {
return false;
}
if (threadContext != null) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: these operations could be done outside the lock. Not sure if there's enough contention here to make any difference though

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there isn't all that much contention here in practice, and it's been like this since it was introduced 4 years ago in #37327.

listener = ContextPreservingActionListener.wrapPreservingContext(listener, threadContext);
}
if (executor != EsExecutors.DIRECT_EXECUTOR_SERVICE) {
listener = new ThreadedActionListener<>(executor, listener);
}
if (listeners == null) {
listeners = new ArrayList<>();
}
listeners.add(listener);
return true;
}

private void notifyListener(ActionListener<V> listener, ExecutorService executorService) {
ActionListener.run(listener, l -> executorService.execute(new Runnable() {
@Override
public void run() {
notifyListenerDirectly(l);
}
private synchronized List<ActionListener<V>> acquireExistingListeners() {
try {
done = true;
return listeners;
} finally {
listeners = null;
}
}

@Override
public String toString() {
return "ListenableFuture notification";
}
}));
private void notifyListener(ActionListener<V> listener) {
assert done;
// call get() in a non-blocking fashion as we could be on a network or scheduler thread which we must not block
ActionListener.completeWith(listener, () -> FutureUtils.get(ListenableFuture.this, 0L, TimeUnit.NANOSECONDS));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.ReachabilityChecker;
Expand All @@ -19,14 +20,16 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;

public class ListenableFutureTests extends ESTestCase {

private ExecutorService executorService;
private ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
private final ThreadContext threadContext = new ThreadContext(Settings.EMPTY);

@After
public void stopExecutorService() throws InterruptedException {
Expand Down Expand Up @@ -145,4 +148,64 @@ public void testAddedListenersReleasedOnCompletion() {
future.addListener(reachabilityChecker.register(ActionListener.running(() -> {})));
reachabilityChecker.ensureUnreachable();
}

public void testRejection() throws Exception {
final CyclicBarrier barrier = new CyclicBarrier(2);
final EsThreadPoolExecutor executorService = EsExecutors.newFixed(
"testRejection",
1,
1,
EsExecutors.daemonThreadFactory("testRejection"),
threadContext,
false
);

try {
executorService.execute(() -> {
try {
barrier.await(10, TimeUnit.SECONDS); // notify main thread that the executor is blocked
barrier.await(10, TimeUnit.SECONDS); // wait for main thread to release us
} catch (Exception e) {
throw new AssertionError("unexpected", e);
}
});

barrier.await(10, TimeUnit.SECONDS); // wait for executor to be blocked

final var listenableFuture = new ListenableFuture<Void>();
final var future1 = new PlainActionFuture<Void>();
final var future2 = new PlainActionFuture<Void>();

listenableFuture.addListener(future1, executorService, null);
listenableFuture.addListener(future2, executorService, null);

final var success = randomBoolean();
if (success) {
listenableFuture.onResponse(null);
} else {
listenableFuture.onFailure(new ElasticsearchException("simulated"));
}

assertFalse(future1.isDone()); // still waiting in the executor queue
assertTrue(future2.isDone()); // rejected from the executor on this thread

barrier.await(10, TimeUnit.SECONDS); // release blocked executor

if (success) {
expectThrows(EsRejectedExecutionException.class, () -> future2.actionGet(0, TimeUnit.SECONDS));
assertNull(future1.actionGet(10, TimeUnit.SECONDS));
} else {
var exception = expectThrows(EsRejectedExecutionException.class, () -> future2.actionGet(0, TimeUnit.SECONDS));
assertEquals(1, exception.getSuppressed().length);
assertThat(exception.getSuppressed()[0], instanceOf(ElasticsearchException.class));
assertEquals(
"simulated",
expectThrows(ElasticsearchException.class, () -> future1.actionGet(10, TimeUnit.SECONDS)).getMessage()
);
}
} finally {
barrier.reset();
terminate(executorService);
}
}
}