Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add temporal routing processors for time-based document routing ([#18920](https://github.com/opensearch-project/OpenSearch/issues/18920))

### Changed
- Add CompletionStage variants to methods in the Client Interface and default to ActionListener impl ([#18998](https://github.com/opensearch-project/OpenSearch/pull/18998))

### Fixed
- Fix unnecessary refreshes on update preparation failures ([#15261](https://github.com/opensearch-project/OpenSearch/issues/15261))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,4 +74,31 @@ public void onFailure(Exception e) {

assertFalse(threadName.get().contains("listener"));
}

public void testIndexWithCompletableFuture() throws Throwable {
final CountDownLatch latch = new CountDownLatch(1);
final AtomicReference<Throwable> failure = new AtomicReference<>();
final AtomicReference<String> threadName = new AtomicReference<>();
Client client = client();

IndexRequest request = new IndexRequest("test").id("1");
if (randomBoolean()) {
// set the source, without it, we will have a verification failure
request.source(Requests.INDEX_CONTENT_TYPE, "field1", "value1");
}

client.indexAsync(request).thenAccept(indexResponse -> {
threadName.set(Thread.currentThread().getName());
latch.countDown();
}).exceptionally(error -> {
threadName.set(Thread.currentThread().getName());
failure.set(error);
latch.countDown();
return null;
});

latch.await();

assertFalse(threadName.get().contains("listener"));
}
}
146 changes: 146 additions & 0 deletions server/src/main/java/org/opensearch/transport/client/Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@
import org.opensearch.core.action.ActionListener;

import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;

/**
* A client provides a one stop interface for performing actions/operations against the cluster.
Expand Down Expand Up @@ -505,4 +507,148 @@ public interface Client extends OpenSearchClient, Releasable {
default Client getRemoteClusterClient(String clusterAlias) {
throw new UnsupportedOperationException("this client doesn't support remote cluster connections");
}

/**
* Index a document - CompletionStage version
*/
default CompletionStage<IndexResponse> indexAsync(IndexRequest request) {
CompletableFuture<IndexResponse> future = new CompletableFuture<>();
index(request, ActionListener.wrap(future::complete, future::completeExceptionally));
return future;
}

/**
* Update a document - CompletionStage version
*/
default CompletionStage<UpdateResponse> updateAsync(UpdateRequest request) {
CompletableFuture<UpdateResponse> future = new CompletableFuture<>();
update(request, ActionListener.wrap(future::complete, future::completeExceptionally));
return future;
}

/**
* Delete a document - CompletionStage version
*/
default CompletionStage<DeleteResponse> deleteAsync(DeleteRequest request) {
CompletableFuture<DeleteResponse> future = new CompletableFuture<>();
delete(request, ActionListener.wrap(future::complete, future::completeExceptionally));
return future;
}

/**
* Bulk operations - CompletionStage version
*/
default CompletionStage<BulkResponse> bulkAsync(BulkRequest request) {
CompletableFuture<BulkResponse> future = new CompletableFuture<>();
bulk(request, ActionListener.wrap(future::complete, future::completeExceptionally));
return future;
}

/**
* Get document - CompletionStage version
*/
default CompletionStage<GetResponse> getAsync(GetRequest request) {
CompletableFuture<GetResponse> future = new CompletableFuture<>();
get(request, ActionListener.wrap(future::complete, future::completeExceptionally));
return future;
}

/**
* Multi get - CompletionStage version
*/
default CompletionStage<MultiGetResponse> multiGetAsync(MultiGetRequest request) {
CompletableFuture<MultiGetResponse> future = new CompletableFuture<>();
multiGet(request, ActionListener.wrap(future::complete, future::completeExceptionally));
return future;
}

/**
* Search - CompletionStage version
*/
default CompletionStage<SearchResponse> searchAsync(SearchRequest request) {
CompletableFuture<SearchResponse> future = new CompletableFuture<>();
search(request, ActionListener.wrap(future::complete, future::completeExceptionally));
return future;
}

/**
* Search scroll - CompletionStage version
*/
default CompletionStage<SearchResponse> searchScrollAsync(SearchScrollRequest request) {
CompletableFuture<SearchResponse> future = new CompletableFuture<>();
searchScroll(request, ActionListener.wrap(future::complete, future::completeExceptionally));
return future;
}

/**
* Multi search - CompletionStage version
*/
default CompletionStage<MultiSearchResponse> multiSearchAsync(MultiSearchRequest request) {
CompletableFuture<MultiSearchResponse> future = new CompletableFuture<>();
multiSearch(request, ActionListener.wrap(future::complete, future::completeExceptionally));
return future;
}

/**
* Term vectors - CompletionStage version
*/
default CompletionStage<TermVectorsResponse> termVectorsAsync(TermVectorsRequest request) {
CompletableFuture<TermVectorsResponse> future = new CompletableFuture<>();
termVectors(request, ActionListener.wrap(future::complete, future::completeExceptionally));
return future;
}

/**
* Multi term vectors - CompletionStage version
*/
default CompletionStage<MultiTermVectorsResponse> multiTermVectorsAsync(MultiTermVectorsRequest request) {
CompletableFuture<MultiTermVectorsResponse> future = new CompletableFuture<>();
multiTermVectors(request, ActionListener.wrap(future::complete, future::completeExceptionally));
return future;
}

/**
* Explain - CompletionStage version
*/
default CompletionStage<ExplainResponse> explainAsync(ExplainRequest request) {
CompletableFuture<ExplainResponse> future = new CompletableFuture<>();
explain(request, ActionListener.wrap(future::complete, future::completeExceptionally));
return future;
}

/**
* Clear scroll - CompletionStage version
*/
default CompletionStage<ClearScrollResponse> clearScrollAsync(ClearScrollRequest request) {
CompletableFuture<ClearScrollResponse> future = new CompletableFuture<>();
clearScroll(request, ActionListener.wrap(future::complete, future::completeExceptionally));
return future;
}

/**
* Field capabilities - CompletionStage version
*/
default CompletionStage<FieldCapabilitiesResponse> fieldCapsAsync(FieldCapabilitiesRequest request) {
CompletableFuture<FieldCapabilitiesResponse> future = new CompletableFuture<>();
fieldCaps(request, ActionListener.wrap(future::complete, future::completeExceptionally));
return future;
}

/**
* Search view - CompletionStage version
*/
default CompletionStage<SearchResponse> searchViewAsync(SearchViewAction.Request request) {
CompletableFuture<SearchResponse> future = new CompletableFuture<>();
searchView(request, ActionListener.wrap(future::complete, future::completeExceptionally));
return future;
}

/**
* List view names - CompletionStage version
*/
default CompletionStage<ListViewNamesAction.Response> listViewNamesAsync(ListViewNamesAction.Request request) {
CompletableFuture<ListViewNamesAction.Response> future = new CompletableFuture<>();
listViewNames(request, ActionListener.wrap(future::complete, future::completeExceptionally));
return future;
}
}
Loading