Skip to content

Refactor ActionListener#map towards Stricter API #65526

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import com.microsoft.azure.storage.StorageException;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.support.GroupedActionListener;
import org.elasticsearch.action.support.PlainActionFuture;
Expand Down Expand Up @@ -139,7 +138,7 @@ public void deleteBlobsIgnoringIfNotExists(List<String> blobNames) throws IOExce
result.onResponse(null);
} else {
final GroupedActionListener<Void> listener =
new GroupedActionListener<>(ActionListener.map(result, v -> null), blobNames.size());
new GroupedActionListener<>(result.map(v -> null), blobNames.size());
final ExecutorService executor = threadPool.executor(AzureRepositoryPlugin.REPOSITORY_THREAD_POOL_NAME);
// Executing deletes in parallel since Azure SDK 8 is using blocking IO while Azure does not provide a bulk delete API endpoint
// TODO: Upgrade to newer non-blocking Azure SDK 11 and execute delete requests in parallel that way.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,7 @@ protected void doExecute(Task task, TestRequest request, ActionListener<TestResp
arrivedLatches.get(request).countDown();
List<TestRequest> subRequests = request.subRequests;
GroupedActionListener<TestResponse> groupedListener =
new GroupedActionListener<>(ActionListener.map(listener, r -> new TestResponse()), subRequests.size() + 1);
new GroupedActionListener<>(listener.map(r -> new TestResponse()), subRequests.size() + 1);
transportService.getThreadPool().generic().execute(ActionRunnable.supply(groupedListener, () -> {
beforeExecuteLatches.get(request).await();
if (((CancellableTask) task).isCancelled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -700,7 +700,7 @@ public void run(){
if (randomBoolean()) {
client().update(ur, new UpdateListener(j));
} else {
client().prepareBulk().add(ur).execute(ActionListener.map(new UpdateListener(j), br -> {
client().prepareBulk().add(ur).execute(new UpdateListener(j).map(br -> {
final BulkItemResponse ir = br.getItems()[0];
if (ir.isFailed()) {
throw ir.getFailure().getCause();
Expand Down
115 changes: 65 additions & 50 deletions server/src/main/java/org/elasticsearch/action/ActionListener.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,71 @@ public interface ActionListener<Response> {
*/
void onFailure(Exception e);

/**
* Creates a listener that wraps this listener, mapping response values via the given mapping function and passing along
* exceptions to this instance.
*
* Notice that it is considered a bug if the listener's onResponse or onFailure fails. onResponse failures will not call onFailure.
*
* If the function fails, the listener's onFailure handler will be called. The principle is that the mapped listener will handle
* exceptions from the mapping function {@code fn} but it is the responsibility of {@code delegate} to handle its own exceptions
* inside `onResponse` and `onFailure`.
*
* @param fn Function to apply to listener response
* @param <T> Response type of the wrapped listener
* @return a listener that maps the received response and then passes it to this instance
*/
default <T> ActionListener<T> map(CheckedFunction<T, Response, Exception> fn) {
return new MappedActionListener<>(fn, this);
}

final class MappedActionListener<Response, MappedResponse> implements ActionListener<Response> {

private final CheckedFunction<Response, MappedResponse, Exception> fn;

private final ActionListener<MappedResponse> delegate;

private MappedActionListener(CheckedFunction<Response, MappedResponse, Exception> fn, ActionListener<MappedResponse> delegate) {
this.fn = fn;
this.delegate = delegate;
}

@Override
public void onResponse(Response response) {
MappedResponse mapped;
try {
mapped = fn.apply(response);
} catch (Exception e) {
onFailure(e);
return;
}
try {
delegate.onResponse(mapped);
} catch (RuntimeException e) {
assert false : new AssertionError("map: listener.onResponse failed", e);
throw e;
}
}

@Override
public void onFailure(Exception e) {
try {
delegate.onFailure(e);
} catch (RuntimeException ex) {
if (ex != e) {
ex.addSuppressed(e);
}
assert false : new AssertionError("map: listener.onFailure failed", ex);
throw ex;
}
}

@Override
public <T> ActionListener<T> map(CheckedFunction<T, Response, Exception> fn) {
return new MappedActionListener<>(t -> this.fn.apply(fn.apply(t)), this.delegate);
}
}

/**
* Creates a listener that listens for a response (or failure) and executes the
* corresponding consumer when the response (or failure) is received.
Expand Down Expand Up @@ -132,56 +197,6 @@ static <Response> ActionListener<Response> wrap(Runnable runnable) {
return wrap(r -> runnable.run(), e -> runnable.run());
}

/**
* Creates a listener that wraps another listener, mapping response values via the given mapping function and passing along
* exceptions to the delegate.
*
* Notice that it is considered a bug if the listener's onResponse or onFailure fails. onResponse failures will not call onFailure.
*
* If the function fails, the listener's onFailure handler will be called. The principle is that the mapped listener will handle
* exceptions from the mapping function {@code fn} but it is the responsibility of {@code delegate} to handle its own exceptions
* inside `onResponse` and `onFailure`.
*
* @param delegate Listener to delegate to
* @param fn Function to apply to listener response
* @param <Response> Response type of the new listener
* @param <T> Response type of the wrapped listener
* @return a listener that maps the received response and then passes it to its delegate listener
*/
static <T, Response> ActionListener<Response> map(ActionListener<T> delegate, CheckedFunction<Response, T, Exception> fn) {
return new ActionListener<>() {
@Override
public void onResponse(Response response) {
T mapped;
try {
mapped = fn.apply(response);
} catch (Exception e) {
onFailure(e);
return;
}
try {
delegate.onResponse(mapped);
} catch (RuntimeException e) {
assert false : new AssertionError("map: listener.onResponse failed", e);
throw e;
}
}

@Override
public void onFailure(Exception e) {
try {
delegate.onFailure(e);
} catch (RuntimeException ex) {
if (ex != e) {
ex.addSuppressed(e);
}
assert false : new AssertionError("map: listener.onFailure failed", ex);
throw ex;
}
}
};
}

/**
* Converts a listener to a {@link BiConsumer} for compatibility with the {@link java.util.concurrent.CompletableFuture}
* api.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ protected void processTasks(CancelTasksRequest request, Consumer<CancellableTask
protected void taskOperation(CancelTasksRequest request, CancellableTask cancellableTask, ActionListener<TaskInfo> listener) {
String nodeId = clusterService.localNode().getId();
taskManager.cancelTaskAndDescendants(cancellableTask, request.getReason(), request.waitForCompletion(),
ActionListener.map(listener, r -> cancellableTask.taskInfo(nodeId, false)));
listener.map(r -> cancellableTask.taskInfo(nodeId, false)));
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ private static ClusterState removeInProgressCleanup(final ClusterState currentSt
@Override
protected void masterOperation(Task task, CleanupRepositoryRequest request, ClusterState state,
ActionListener<CleanupRepositoryResponse> listener) {
cleanupRepo(request.name(), ActionListener.map(listener, CleanupRepositoryResponse::new));
cleanupRepo(request.name(), listener.map(CleanupRepositoryResponse::new));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,7 @@ protected ClusterBlockException checkBlock(DeleteRepositoryRequest request, Clus
@Override
protected void masterOperation(Task task, final DeleteRepositoryRequest request, ClusterState state,
final ActionListener<AcknowledgedResponse> listener) {
repositoriesService.unregisterRepository(
request, ActionListener.delegateFailure(listener,
(delegatedListener, unregisterRepositoryResponse) ->
delegatedListener.onResponse(AcknowledgedResponse.of(unregisterRepositoryResponse.isAcknowledged()))));
repositoriesService.unregisterRepository(request,
listener.map(unregisterRepositoryResponse -> AcknowledgedResponse.of(unregisterRepositoryResponse.isAcknowledged())));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ protected ClusterBlockException checkBlock(PutRepositoryRequest request, Cluster
@Override
protected void masterOperation(Task task, final PutRepositoryRequest request, ClusterState state,
final ActionListener<AcknowledgedResponse> listener) {
repositoriesService.registerRepository(request, ActionListener.delegateFailure(listener,
(delegatedListener, response) -> delegatedListener.onResponse(AcknowledgedResponse.of(response.isAcknowledged()))));
repositoriesService.registerRepository(request, listener.map(response -> AcknowledgedResponse.of(response.isAcknowledged())));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,7 @@ protected ClusterBlockException checkBlock(VerifyRepositoryRequest request, Clus
@Override
protected void masterOperation(Task task, final VerifyRepositoryRequest request, ClusterState state,
final ActionListener<VerifyRepositoryResponse> listener) {
repositoriesService.verifyRepository(request.name(), ActionListener.delegateFailure(listener,
(delegatedListener, verifyResponse) ->
delegatedListener.onResponse(new VerifyRepositoryResponse(verifyResponse.toArray(new DiscoveryNode[0])))));
repositoriesService.verifyRepository(request.name(), listener.map(
verifyResponse -> new VerifyRepositoryResponse(verifyResponse.toArray(new DiscoveryNode[0]))));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,7 @@ private void verifyThenSubmitUpdate(ClusterRerouteRequest request, ActionListene

private void submitStateUpdate(final ClusterRerouteRequest request, final ActionListener<ClusterRerouteResponse> listener) {
clusterService.submitStateUpdateTask("cluster_reroute (api)",
new ClusterRerouteResponseAckedClusterStateUpdateTask(logger, allocationService, request,
ActionListener.map(listener,
new ClusterRerouteResponseAckedClusterStateUpdateTask(logger, allocationService, request, listener.map(
response -> {
if (request.dryRun() == false) {
response.getExplanations().getYesDecisionMessages().forEach(logger::info);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,6 @@ protected ClusterBlockException checkBlock(CloneSnapshotRequest request, Cluster
@Override
protected void masterOperation(Task task, final CloneSnapshotRequest request, ClusterState state,
final ActionListener<AcknowledgedResponse> listener) {
snapshotsService.cloneSnapshot(request, ActionListener.map(listener, v -> AcknowledgedResponse.TRUE));
snapshotsService.cloneSnapshot(request, listener.map(v -> AcknowledgedResponse.TRUE));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,9 @@ protected ClusterBlockException checkBlock(CreateSnapshotRequest request, Cluste
protected void masterOperation(Task task, final CreateSnapshotRequest request, ClusterState state,
final ActionListener<CreateSnapshotResponse> listener) {
if (request.waitForCompletion()) {
snapshotsService.executeSnapshot(request, ActionListener.map(listener, CreateSnapshotResponse::new));
snapshotsService.executeSnapshot(request, listener.map(CreateSnapshotResponse::new));
} else {
snapshotsService.createSnapshot(request, ActionListener.map(listener, snapshot -> new CreateSnapshotResponse()));
snapshotsService.createSnapshot(request, listener.map(snapshot -> new CreateSnapshotResponse()));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,6 @@ protected ClusterBlockException checkBlock(DeleteSnapshotRequest request, Cluste
@Override
protected void masterOperation(Task task, final DeleteSnapshotRequest request, ClusterState state,
final ActionListener<AcknowledgedResponse> listener) {
snapshotsService.deleteSnapshots(request, ActionListener.map(listener, v -> AcknowledgedResponse.TRUE));
snapshotsService.deleteSnapshots(request, listener.map(v -> AcknowledgedResponse.TRUE));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -106,21 +106,21 @@ private void getMultipleReposSnapshotInfo(SnapshotsInProgress snapshotsInProgres
}
final GroupedActionListener<GetSnapshotsResponse.Response> groupedActionListener =
new GroupedActionListener<>(
ActionListener.map(listener, responses -> {
listener.map(responses -> {
assert repos.size() == responses.size();
return new GetSnapshotsResponse(responses);
}), repos.size());

for (final RepositoryMetadata repo : repos) {
final String repoName = repo.name();
getSingleRepoSnapshotInfo(snapshotsInProgress, repoName, snapshots, ignoreUnavailable, verbose, ActionListener.map(
getSingleRepoSnapshotInfo(snapshotsInProgress, repoName, snapshots, ignoreUnavailable, verbose,
ActionListener.delegateResponse(groupedActionListener, (groupedListener, e) -> {
if (e instanceof ElasticsearchException) {
groupedListener.onResponse(GetSnapshotsResponse.Response.error(repoName, (ElasticsearchException) e));
} else {
groupedListener.onFailure(e);
}
}), snInfos -> GetSnapshotsResponse.Response.snapshots(repoName, snInfos)));
}).map(snInfos -> GetSnapshotsResponse.Response.snapshots(repoName, snInfos)));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ protected void masterOperation(Task task, final CreateIndexRequest request, fina
.aliases(request.aliases())
.waitForActiveShards(request.waitForActiveShards());

createIndexService.createIndex(updateRequest, ActionListener.map(listener, response ->
createIndexService.createIndex(updateRequest, listener.map(response ->
new CreateIndexResponse(response.isAcknowledged(), response.isShardsAcknowledged(), indexName)));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ protected void masterOperation(Task task, final ResizeRequest resizeRequest, fin
return;
}
createIndexService.createIndex(
updateRequest, ActionListener.map(delegatedListener,
updateRequest, delegatedListener.map(
response -> new ResizeResponse(response.isAcknowledged(), response.isShardsAcknowledged(), updateRequest.index()))
);
}));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -787,17 +787,16 @@ BulkRequest getBulkRequest() {

ActionListener<BulkResponse> wrapActionListenerIfNeeded(long ingestTookInMillis, ActionListener<BulkResponse> actionListener) {
if (itemResponses.isEmpty()) {
return ActionListener.map(actionListener,
return actionListener.map(
response -> new BulkResponse(response.getItems(), response.getTook().getMillis(), ingestTookInMillis));
} else {
return ActionListener.delegateFailure(actionListener, (delegatedListener, response) -> {
return actionListener.map(response -> {
BulkItemResponse[] items = response.getItems();
for (int i = 0; i < items.length; i++) {
itemResponses.add(originalSlots.get(i), response.getItems()[i]);
}
delegatedListener.onResponse(
new BulkResponse(
itemResponses.toArray(new BulkItemResponse[0]), response.getTook().getMillis(), ingestTookInMillis));
return new BulkResponse(
itemResponses.toArray(new BulkItemResponse[0]), response.getTook().getMillis(), ingestTookInMillis);
});
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ public static void closeContexts(DiscoveryNodes nodes, SearchTransportService se
}
lookupListener.whenComplete(nodeLookup -> {
final GroupedActionListener<Boolean> groupedListener = new GroupedActionListener<>(
ActionListener.delegateFailure(listener, (l, rs) -> l.onResponse(Math.toIntExact(rs.stream().filter(r -> r).count()))),
listener.map(rs -> Math.toIntExact(rs.stream().filter(r -> r).count())),
contextIds.size()
);
for (SearchContextIdForNode contextId : contextIds) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ static void collectNodesAndRun(final Iterable<SearchContextIdForNode> scrollIds,
listener.onResponse((cluster, node) -> nodes.get(node));
} else {
RemoteClusterService remoteClusterService = searchTransportService.getRemoteClusterService();
remoteClusterService.collectNodes(clusters, ActionListener.map(listener,
remoteClusterService.collectNodes(clusters, listener.map(
nodeFunction -> (clusterAlias, node) -> clusterAlias == null ? nodes.get(node) : nodeFunction.apply(clusterAlias, node)));
}
}
Expand Down
Loading