Skip to content

Commit

Permalink
Use FluentFuture in QueuedStatementResource
Browse files Browse the repository at this point in the history
This makes the code easier to read, but does not change behavior.
  • Loading branch information
electrum committed Aug 4, 2021
1 parent cb33b01 commit 55811f2
Showing 1 changed file with 19 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Ordering;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.FluentFuture;
import com.google.common.util.concurrent.ListenableFuture;
import io.airlift.log.Logger;
import io.airlift.units.Duration;
Expand Down Expand Up @@ -66,12 +66,12 @@
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;

import static com.google.common.base.MoreObjects.firstNonNull;
import static com.google.common.base.Strings.isNullOrEmpty;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static io.airlift.concurrent.MoreFutures.addTimeout;
import static io.airlift.concurrent.Threads.threadsNamed;
import static io.airlift.jaxrs.AsyncResponseHandler.bindAsyncResponse;
import static io.trino.execution.QueryState.FAILED;
Expand Down Expand Up @@ -197,7 +197,7 @@ public Response postStatement(
// let authentication filter know that identity lifecycle has been handed off
servletRequest.setAttribute(AUTHENTICATED_IDENTITY, null);

return createQueryResultsResponse(query.getQueryResults(query.getLastToken(), uriInfo), compressionEnabled);
return createQueryResultsResponse(query.getQueryResults(query.getLastToken(), uriInfo));
}

@ResourceSecurity(PUBLIC)
Expand All @@ -214,25 +214,21 @@ public void getStatus(
{
Query query = getQuery(queryId, slug, token);

// wait for query to be dispatched, up to the wait timeout
ListenableFuture<Void> futureStateChange = addTimeout(
query.waitForDispatched(),
() -> null,
WAIT_ORDERING.min(MAX_WAIT_TIME, maxWait),
timeoutExecutor);

// when state changes, fetch the next result
ListenableFuture<QueryResults> queryResultsFuture = Futures.transform(
futureStateChange,
ignored -> query.getQueryResults(token, uriInfo),
responseExecutor);

// transform to Response
ListenableFuture<Response> response = Futures.transform(
queryResultsFuture,
queryResults -> createQueryResultsResponse(queryResults, compressionEnabled),
directExecutor());
bindAsyncResponse(asyncResponse, response, responseExecutor);
ListenableFuture<Response> future = getStatus(query, token, maxWait, uriInfo);
bindAsyncResponse(asyncResponse, future, responseExecutor);
}

private ListenableFuture<Response> getStatus(Query query, long token, Duration maxWait, UriInfo uriInfo)
{
long waitMillis = WAIT_ORDERING.min(MAX_WAIT_TIME, maxWait).toMillis();

return FluentFuture.from(query.waitForDispatched())
// wait for query to be dispatched, up to the wait timeout
.withTimeout(waitMillis, MILLISECONDS, timeoutExecutor)
.catching(TimeoutException.class, ignored -> null, directExecutor())
// when state changes, fetch the next result
.transform(ignored -> query.getQueryResults(token, uriInfo), responseExecutor)
.transform(this::createQueryResultsResponse, directExecutor());
}

@ResourceSecurity(PUBLIC)
Expand All @@ -258,7 +254,7 @@ private Query getQuery(QueryId queryId, String slug, long token)
return query;
}

private static Response createQueryResultsResponse(QueryResults results, boolean compressionEnabled)
private Response createQueryResultsResponse(QueryResults results)
{
Response.ResponseBuilder builder = Response.ok(results);
if (!compressionEnabled) {
Expand Down

0 comments on commit 55811f2

Please sign in to comment.