Skip to content

Refactor ActionListener#map towards Stricter API (#65526) #65650

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Dec 1, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import com.microsoft.azure.storage.StorageException;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.support.GroupedActionListener;
import org.elasticsearch.action.support.PlainActionFuture;
Expand Down Expand Up @@ -139,7 +138,7 @@ public void deleteBlobsIgnoringIfNotExists(List<String> blobNames) throws IOExce
result.onResponse(null);
} else {
final GroupedActionListener<Void> listener =
new GroupedActionListener<>(ActionListener.map(result, v -> null), blobNames.size());
new GroupedActionListener<>(result.map(v -> null), blobNames.size());
final ExecutorService executor = threadPool.executor(AzureRepositoryPlugin.REPOSITORY_THREAD_POOL_NAME);
// Executing deletes in parallel since Azure SDK 8 is using blocking IO while Azure does not provide a bulk delete API endpoint
// TODO: Upgrade to newer non-blocking Azure SDK 11 and execute delete requests in parallel that way.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,7 @@ protected void doExecute(Task task, TestRequest request, ActionListener<TestResp
arrivedLatches.get(request).countDown();
List<TestRequest> subRequests = request.subRequests;
GroupedActionListener<TestResponse> groupedListener =
new GroupedActionListener<>(ActionListener.map(listener, r -> new TestResponse()), subRequests.size() + 1);
new GroupedActionListener<>(listener.map(r -> new TestResponse()), subRequests.size() + 1);
transportService.getThreadPool().generic().execute(ActionRunnable.supply(groupedListener, () -> {
beforeExecuteLatches.get(request).await();
if (((CancellableTask) task).isCancelled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -705,7 +705,7 @@ public void run(){
if (randomBoolean()) {
client().update(ur, new UpdateListener(j));
} else {
client().prepareBulk().add(ur).execute(ActionListener.map(new UpdateListener(j), br -> {
client().prepareBulk().add(ur).execute(new UpdateListener(j).map(br -> {
final BulkItemResponse ir = br.getItems()[0];
if (ir.isFailed()) {
throw ir.getFailure().getCause();
Expand Down
115 changes: 65 additions & 50 deletions server/src/main/java/org/elasticsearch/action/ActionListener.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,71 @@ public interface ActionListener<Response> {
*/
void onFailure(Exception e);

/**
* Creates a listener that wraps this listener, mapping response values via the given mapping function and passing along
* exceptions to this instance.
*
* Notice that it is considered a bug if the listener's onResponse or onFailure fails. onResponse failures will not call onFailure.
*
* If the function fails, the listener's onFailure handler will be called. The principle is that the mapped listener will handle
* exceptions from the mapping function {@code fn} but it is the responsibility of {@code delegate} to handle its own exceptions
* inside `onResponse` and `onFailure`.
*
* @param fn Function to apply to listener response
* @param <T> Response type of the wrapped listener
* @return a listener that maps the received response and then passes it to this instance
*/
default <T> ActionListener<T> map(CheckedFunction<T, Response, Exception> fn) {
return new MappedActionListener<>(fn, this);
}

final class MappedActionListener<Response, MappedResponse> implements ActionListener<Response> {

private final CheckedFunction<Response, MappedResponse, Exception> fn;

private final ActionListener<MappedResponse> delegate;

private MappedActionListener(CheckedFunction<Response, MappedResponse, Exception> fn, ActionListener<MappedResponse> delegate) {
this.fn = fn;
this.delegate = delegate;
}

@Override
public void onResponse(Response response) {
MappedResponse mapped;
try {
mapped = fn.apply(response);
} catch (Exception e) {
onFailure(e);
return;
}
try {
delegate.onResponse(mapped);
} catch (RuntimeException e) {
assert false : new AssertionError("map: listener.onResponse failed", e);
throw e;
}
}

@Override
public void onFailure(Exception e) {
try {
delegate.onFailure(e);
} catch (RuntimeException ex) {
if (ex != e) {
ex.addSuppressed(e);
}
assert false : new AssertionError("map: listener.onFailure failed", ex);
throw ex;
}
}

@Override
public <T> ActionListener<T> map(CheckedFunction<T, Response, Exception> fn) {
return new MappedActionListener<>(t -> this.fn.apply(fn.apply(t)), this.delegate);
}
}

/**
* Creates a listener that listens for a response (or failure) and executes the
* corresponding consumer when the response (or failure) is received.
Expand Down Expand Up @@ -132,56 +197,6 @@ static <Response> ActionListener<Response> wrap(Runnable runnable) {
return wrap(r -> runnable.run(), e -> runnable.run());
}

/**
* Creates a listener that wraps another listener, mapping response values via the given mapping function and passing along
* exceptions to the delegate.
*
* Notice that it is considered a bug if the listener's onResponse or onFailure fails. onResponse failures will not call onFailure.
*
* If the function fails, the listener's onFailure handler will be called. The principle is that the mapped listener will handle
* exceptions from the mapping function {@code fn} but it is the responsibility of {@code delegate} to handle its own exceptions
* inside `onResponse` and `onFailure`.
*
* @param delegate Listener to delegate to
* @param fn Function to apply to listener response
* @param <Response> Response type of the new listener
* @param <T> Response type of the wrapped listener
* @return a listener that maps the received response and then passes it to its delegate listener
*/
static <T, Response> ActionListener<Response> map(ActionListener<T> delegate, CheckedFunction<Response, T, Exception> fn) {
return new ActionListener<Response>() {
@Override
public void onResponse(Response response) {
T mapped;
try {
mapped = fn.apply(response);
} catch (Exception e) {
onFailure(e);
return;
}
try {
delegate.onResponse(mapped);
} catch (RuntimeException e) {
assert false : new AssertionError("map: listener.onResponse failed", e);
throw e;
}
}

@Override
public void onFailure(Exception e) {
try {
delegate.onFailure(e);
} catch (RuntimeException ex) {
if (ex != e) {
ex.addSuppressed(e);
}
assert false : new AssertionError("map: listener.onFailure failed", ex);
throw ex;
}
}
};
}

/**
* Converts a listener to a {@link BiConsumer} for compatibility with the {@link java.util.concurrent.CompletableFuture}
* api.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ protected void processTasks(CancelTasksRequest request, Consumer<CancellableTask
protected void taskOperation(CancelTasksRequest request, CancellableTask cancellableTask, ActionListener<TaskInfo> listener) {
String nodeId = clusterService.localNode().getId();
taskManager.cancelTaskAndDescendants(cancellableTask, request.getReason(), request.waitForCompletion(),
ActionListener.map(listener, r -> cancellableTask.taskInfo(nodeId, false)));
listener.map(r -> cancellableTask.taskInfo(nodeId, false)));
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ private static ClusterState removeInProgressCleanup(final ClusterState currentSt
protected void masterOperation(CleanupRepositoryRequest request, ClusterState state,
ActionListener<CleanupRepositoryResponse> listener) {
if (state.nodes().getMinNodeVersion().onOrAfter(MIN_VERSION)) {
cleanupRepo(request.name(), ActionListener.map(listener, CleanupRepositoryResponse::new));
cleanupRepo(request.name(), listener.map(CleanupRepositoryResponse::new));
} else {
throw new IllegalArgumentException("Repository cleanup is only supported from version [" + MIN_VERSION
+ "] but the oldest node version in the cluster is [" + state.nodes().getMinNodeVersion() + ']');
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,7 @@ protected ClusterBlockException checkBlock(DeleteRepositoryRequest request, Clus
@Override
protected void masterOperation(final DeleteRepositoryRequest request, ClusterState state,
final ActionListener<AcknowledgedResponse> listener) {
repositoriesService.unregisterRepository(
request, ActionListener.delegateFailure(listener,
(delegatedListener, unregisterRepositoryResponse) ->
delegatedListener.onResponse(AcknowledgedResponse.of(unregisterRepositoryResponse.isAcknowledged()))));
repositoriesService.unregisterRepository(request,
listener.map(unregisterRepositoryResponse -> AcknowledgedResponse.of(unregisterRepositoryResponse.isAcknowledged())));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ protected ClusterBlockException checkBlock(PutRepositoryRequest request, Cluster
@Override
protected void masterOperation(final PutRepositoryRequest request, ClusterState state,
final ActionListener<AcknowledgedResponse> listener) {
repositoriesService.registerRepository(request, ActionListener.delegateFailure(listener,
(delegatedListener, response) -> delegatedListener.onResponse(AcknowledgedResponse.of(response.isAcknowledged()))));
repositoriesService.registerRepository(request, listener.map(response -> AcknowledgedResponse.of(response.isAcknowledged())));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,7 @@ protected ClusterBlockException checkBlock(VerifyRepositoryRequest request, Clus
@Override
protected void masterOperation(final VerifyRepositoryRequest request, ClusterState state,
final ActionListener<VerifyRepositoryResponse> listener) {
repositoriesService.verifyRepository(request.name(), ActionListener.delegateFailure(listener,
(delegatedListener, verifyResponse) ->
delegatedListener.onResponse(new VerifyRepositoryResponse(verifyResponse.toArray(new DiscoveryNode[0])))));
repositoriesService.verifyRepository(request.name(), listener.map(
verifyResponse -> new VerifyRepositoryResponse(verifyResponse.toArray(new DiscoveryNode[0]))));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,7 @@ private void verifyThenSubmitUpdate(ClusterRerouteRequest request, ActionListene

private void submitStateUpdate(final ClusterRerouteRequest request, final ActionListener<ClusterRerouteResponse> listener) {
clusterService.submitStateUpdateTask("cluster_reroute (api)",
new ClusterRerouteResponseAckedClusterStateUpdateTask(logger, allocationService, request,
ActionListener.map(listener,
new ClusterRerouteResponseAckedClusterStateUpdateTask(logger, allocationService, request, listener.map(
response -> {
if (request.dryRun() == false) {
response.getExplanations().getYesDecisionMessages().forEach(logger::info);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public TransportCloneSnapshotAction(TransportService transportService, ClusterSe

@Override
protected void masterOperation(CloneSnapshotRequest request, ClusterState state, ActionListener<AcknowledgedResponse> listener) {
snapshotsService.cloneSnapshot(request, ActionListener.map(listener, v -> AcknowledgedResponse.TRUE));
snapshotsService.cloneSnapshot(request, listener.map(v -> AcknowledgedResponse.TRUE));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,15 +58,15 @@ protected void masterOperation(final CreateSnapshotRequest request, ClusterState
final ActionListener<CreateSnapshotResponse> listener) {
if (state.nodes().getMinNodeVersion().before(SnapshotsService.NO_REPO_INITIALIZE_VERSION)) {
if (request.waitForCompletion()) {
snapshotsService.executeSnapshotLegacy(request, ActionListener.map(listener, CreateSnapshotResponse::new));
snapshotsService.executeSnapshotLegacy(request, listener.map(CreateSnapshotResponse::new));
} else {
snapshotsService.createSnapshotLegacy(request, ActionListener.map(listener, snapshot -> new CreateSnapshotResponse()));
snapshotsService.createSnapshotLegacy(request, listener.map(snapshot -> new CreateSnapshotResponse()));
}
} else {
if (request.waitForCompletion()) {
snapshotsService.executeSnapshot(request, ActionListener.map(listener, CreateSnapshotResponse::new));
snapshotsService.executeSnapshot(request, listener.map(CreateSnapshotResponse::new));
} else {
snapshotsService.createSnapshot(request, ActionListener.map(listener, snapshot -> new CreateSnapshotResponse()));
snapshotsService.createSnapshot(request, listener.map(snapshot -> new CreateSnapshotResponse()));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,6 @@ protected ClusterBlockException checkBlock(DeleteSnapshotRequest request, Cluste
@Override
protected void masterOperation(final DeleteSnapshotRequest request, ClusterState state,
final ActionListener<AcknowledgedResponse> listener) {
snapshotsService.deleteSnapshots(request, ActionListener.map(listener, v -> AcknowledgedResponse.TRUE));
snapshotsService.deleteSnapshots(request, listener.map(v -> AcknowledgedResponse.TRUE));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ protected void masterOperation(final CreateIndexRequest request, final ClusterSt
.aliases(request.aliases())
.waitForActiveShards(request.waitForActiveShards());

createIndexService.createIndex(updateRequest, ActionListener.map(listener, response ->
createIndexService.createIndex(updateRequest, listener.map(response ->
new CreateIndexResponse(response.isAcknowledged(), response.isShardsAcknowledged(), indexName)));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ protected void masterOperation(final ResizeRequest resizeRequest, final ClusterS
return shard == null ? null : shard.getPrimary().getDocs();
}, sourceIndex, targetIndex);
createIndexService.createIndex(
updateRequest, ActionListener.map(delegatedListener,
updateRequest, delegatedListener.map(
response -> new ResizeResponse(response.isAcknowledged(), response.isShardsAcknowledged(), updateRequest.index()))
);
}));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -796,17 +796,16 @@ BulkRequest getBulkRequest() {

ActionListener<BulkResponse> wrapActionListenerIfNeeded(long ingestTookInMillis, ActionListener<BulkResponse> actionListener) {
if (itemResponses.isEmpty()) {
return ActionListener.map(actionListener,
return actionListener.map(
response -> new BulkResponse(response.getItems(), response.getTook().getMillis(), ingestTookInMillis));
} else {
return ActionListener.delegateFailure(actionListener, (delegatedListener, response) -> {
return actionListener.map(response -> {
BulkItemResponse[] items = response.getItems();
for (int i = 0; i < items.length; i++) {
itemResponses.add(originalSlots.get(i), response.getItems()[i]);
}
delegatedListener.onResponse(
new BulkResponse(
itemResponses.toArray(new BulkItemResponse[0]), response.getTook().getMillis(), ingestTookInMillis));
return new BulkResponse(
itemResponses.toArray(new BulkItemResponse[0]), response.getTook().getMillis(), ingestTookInMillis);
});
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ public static void closeContexts(DiscoveryNodes nodes, SearchTransportService se
}
lookupListener.whenComplete(nodeLookup -> {
final GroupedActionListener<Boolean> groupedListener = new GroupedActionListener<>(
ActionListener.delegateFailure(listener, (l, rs) -> l.onResponse(Math.toIntExact(rs.stream().filter(r -> r).count()))),
listener.map(rs -> Math.toIntExact(rs.stream().filter(r -> r).count())),
contextIds.size()
);
for (SearchContextIdForNode contextId : contextIds) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ static void collectNodesAndRun(final Iterable<SearchContextIdForNode> scrollIds,
listener.onResponse((cluster, node) -> nodes.get(node));
} else {
RemoteClusterService remoteClusterService = searchTransportService.getRemoteClusterService();
remoteClusterService.collectNodes(clusters, ActionListener.map(listener,
remoteClusterService.collectNodes(clusters, listener.map(
nodeFunction -> (clusterAlias, node) -> clusterAlias == null ? nodes.get(node) : nodeFunction.apply(clusterAlias, node)));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,7 @@ public void handleException(TransportException exp) {
}, e -> handleException(primaryShardReference, e));

new ReplicationOperation<>(primaryRequest.getRequest(), primaryShardReference,
ActionListener.map(responseListener, result -> result.finalResponseIfSuccessful),
responseListener.map(result -> result.finalResponseIfSuccessful),
newReplicasProxy(), logger, threadPool, actionName, primaryRequest.getPrimaryTerm(), initialRetryBackoffBound,
retryTimeout)
.execute();
Expand Down Expand Up @@ -955,7 +955,7 @@ public void failShard(String reason, Exception e) {
@Override
public void perform(Request request, ActionListener<PrimaryResult<ReplicaRequest, Response>> listener) {
if (Assertions.ENABLED) {
listener = ActionListener.map(listener, result -> {
listener = listener.map(result -> {
assert result.replicaRequest() == null || result.finalFailure == null : "a replica request [" + result.replicaRequest()
+ "] with a primary failure [" + result.finalFailure + "]";
return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,7 @@ protected void updateIndicesReadOnly(Set<String> indicesToUpdate, ActionListener
Settings.builder().putNull(IndexMetadata.SETTING_READ_ONLY_ALLOW_DELETE).build();
client.admin().indices().prepareUpdateSettings(indicesToUpdate.toArray(Strings.EMPTY_ARRAY))
.setSettings(readOnlySettings)
.execute(ActionListener.map(wrappedListener, r -> null));
.execute(wrappedListener.map(r -> null));
}

private static void cleanUpRemovedNodes(ObjectLookupContainer<String> nodesToKeep, Set<String> nodesToCleanUp) {
Expand Down
Loading