Skip to content

Notify refresh listeners on the calling thread #53259

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3101,7 +3101,6 @@ private RefreshListeners buildRefreshListeners() {
return new RefreshListeners(
indexSettings::getMaxRefreshListeners,
() -> refresh("too_many_listeners"),
threadPool.executor(ThreadPool.Names.LISTENER),
logger, threadPool.getThreadContext(),
externalRefreshMetric);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.function.Consumer;
import java.util.function.IntSupplier;
import java.util.function.Supplier;
Expand All @@ -48,7 +47,6 @@
public final class RefreshListeners implements ReferenceManager.RefreshListener, Closeable {
private final IntSupplier getMaxRefreshListeners;
private final Runnable forceRefresh;
private final Executor listenerExecutor;
private final Logger logger;
private final ThreadContext threadContext;
private final MeanMetric refreshMetric;
Expand Down Expand Up @@ -82,11 +80,15 @@ public final class RefreshListeners implements ReferenceManager.RefreshListener,
*/
private volatile Translog.Location lastRefreshedLocation;

public RefreshListeners(IntSupplier getMaxRefreshListeners, Runnable forceRefresh, Executor listenerExecutor, Logger logger,
ThreadContext threadContext, MeanMetric refreshMetric) {
public RefreshListeners(
final IntSupplier getMaxRefreshListeners,
final Runnable forceRefresh,
final Logger logger,
final ThreadContext threadContext,
final MeanMetric refreshMetric
) {
this.getMaxRefreshListeners = getMaxRefreshListeners;
this.forceRefresh = forceRefresh;
this.listenerExecutor = listenerExecutor;
this.logger = logger;
this.threadContext = threadContext;
this.refreshMetric = refreshMetric;
Expand Down Expand Up @@ -282,24 +284,22 @@ public void afterRefresh(boolean didRefresh) throws IOException {
}
}
}
// Lastly, fire the listeners that are ready on the listener thread pool
// Lastly, fire the listeners that are ready
fireListeners(listenersToFire);
}

/**
* Fire some listeners. Does nothing if the list of listeners is null.
*/
private void fireListeners(List<Tuple<Translog.Location, Consumer<Boolean>>> listenersToFire) {
private void fireListeners(final List<Tuple<Translog.Location, Consumer<Boolean>>> listenersToFire) {
if (listenersToFire != null) {
listenerExecutor.execute(() -> {
for (Tuple<Translog.Location, Consumer<Boolean>> listener : listenersToFire) {
try {
listener.v2().accept(false);
} catch (Exception e) {
logger.warn("Error firing refresh listener", e);
}
for (final Tuple<Translog.Location, Consumer<Boolean>> listener : listenersToFire) {
try {
listener.v2().accept(false);
} catch (final Exception e) {
logger.warn("error firing refresh listener", e);
}
});
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,6 @@ public void setupListeners() throws Exception {
listeners = new RefreshListeners(
() -> maxListeners,
() -> engine.refresh("too-many-listeners"),
// Immediately run listeners rather than adding them to the listener thread pool like IndexShard does to simplify the test.
Runnable::run,
logger,
threadPool.getThreadContext(),
refreshMetric);
Expand Down