Skip to content

Drop action future that forks on listener executor #53261

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,8 @@

package org.elasticsearch.action.support;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ListenableActionFuture;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.ArrayList;
import java.util.List;
Expand All @@ -46,17 +43,6 @@ public static <T> PlainListenableActionFuture<T> newListenableFuture() {
return new PlainListenableActionFuture<>();
}

/**
* This method returns a listenable future. The listeners will be called on completion of the future.
* The listeners will be executed on the LISTENER thread pool.
* @param threadPool the thread pool used to execute listeners
* @param <T> the result of the future
* @return a listenable future
*/
public static <T> PlainListenableActionFuture<T> newDispatchingListenableFuture(ThreadPool threadPool) {
return new DispatchingListenableActionFuture<>(threadPool);
}

@Override
public void addListener(final ActionListener<T> listener) {
internalAddListener(listener);
Expand Down Expand Up @@ -121,18 +107,4 @@ private void executeListener(final ActionListener<T> listener) {
}
}

private static final class DispatchingListenableActionFuture<T> extends PlainListenableActionFuture<T> {

private static final Logger logger = LogManager.getLogger(DispatchingListenableActionFuture.class);
private final ThreadPool threadPool;

private DispatchingListenableActionFuture(ThreadPool threadPool) {
this.threadPool = threadPool;
}

@Override
public void addListener(final ActionListener<T> listener) {
super.addListener(new ThreadedActionListener<>(logger, threadPool, ThreadPool.Names.LISTENER, listener, false));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,7 @@ public class ListenableActionFutureTests extends ESTestCase {
public void testListenerIsCallableFromNetworkThreads() throws Throwable {
ThreadPool threadPool = new TestThreadPool("testListenerIsCallableFromNetworkThreads");
try {
final PlainListenableActionFuture<Object> future;
if (randomBoolean()) {
future = PlainListenableActionFuture.newDispatchingListenableFuture(threadPool);
} else {
future = PlainListenableActionFuture.newListenableFuture();
}
final PlainListenableActionFuture<Object> future = PlainListenableActionFuture.newListenableFuture();
final CountDownLatch listenerCalled = new CountDownLatch(1);
final AtomicReference<Throwable> error = new AtomicReference<>();
final Object response = new Object();
Expand Down