Skip to content

Reduce query lock contention #18023

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jun 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import io.airlift.units.Duration;
import io.trino.Session;
import io.trino.client.ProtocolHeaders;
import io.trino.client.QueryResults;
import io.trino.exchange.ExchangeManagerRegistry;
import io.trino.execution.QueryManager;
import io.trino.operator.DirectExchangeClientSupplier;
Expand Down Expand Up @@ -220,52 +219,52 @@ private void asyncQueryResults(
else {
targetResultSize = Ordering.natural().min(targetResultSize, MAX_TARGET_RESULT_SIZE);
}
ListenableFuture<QueryResults> queryResultsFuture = query.waitForResults(token, uriInfo, wait, targetResultSize);
ListenableFuture<QueryResultsResponse> queryResultsFuture = query.waitForResults(token, uriInfo, wait, targetResultSize);

ListenableFuture<Response> response = Futures.transform(queryResultsFuture, queryResults -> toResponse(query, queryResults), directExecutor());
ListenableFuture<Response> response = Futures.transform(queryResultsFuture, this::toResponse, directExecutor());

bindAsyncResponse(asyncResponse, response, responseExecutor);
}

private Response toResponse(Query query, QueryResults queryResults)
private Response toResponse(QueryResultsResponse resultsResponse)
{
ResponseBuilder response = Response.ok(queryResults);
ResponseBuilder response = Response.ok(resultsResponse.queryResults());

ProtocolHeaders protocolHeaders = query.getProtocolHeaders();
query.getSetCatalog().ifPresent(catalog -> response.header(protocolHeaders.responseSetCatalog(), catalog));
query.getSetSchema().ifPresent(schema -> response.header(protocolHeaders.responseSetSchema(), schema));
query.getSetPath().ifPresent(path -> response.header(protocolHeaders.responseSetPath(), path));
ProtocolHeaders protocolHeaders = resultsResponse.protocolHeaders();
resultsResponse.setCatalog().ifPresent(catalog -> response.header(protocolHeaders.responseSetCatalog(), catalog));
resultsResponse.setSchema().ifPresent(schema -> response.header(protocolHeaders.responseSetSchema(), schema));
resultsResponse.setPath().ifPresent(path -> response.header(protocolHeaders.responseSetPath(), path));

// add set session properties
query.getSetSessionProperties()
resultsResponse.setSessionProperties()
.forEach((key, value) -> response.header(protocolHeaders.responseSetSession(), key + '=' + urlEncode(value)));

// add clear session properties
query.getResetSessionProperties()
resultsResponse.resetSessionProperties()
.forEach(name -> response.header(protocolHeaders.responseClearSession(), name));

// add set roles
query.getSetRoles()
resultsResponse.setRoles()
.forEach((key, value) -> response.header(protocolHeaders.responseSetRole(), key + '=' + urlEncode(value.toString())));

// add added prepare statements
for (Entry<String, String> entry : query.getAddedPreparedStatements().entrySet()) {
for (Entry<String, String> entry : resultsResponse.addedPreparedStatements().entrySet()) {
String encodedKey = urlEncode(entry.getKey());
String encodedValue = urlEncode(preparedStatementEncoder.encodePreparedStatementForHeader(entry.getValue()));
response.header(protocolHeaders.responseAddedPrepare(), encodedKey + '=' + encodedValue);
}

// add deallocated prepare statements
for (String name : query.getDeallocatedPreparedStatements()) {
for (String name : resultsResponse.deallocatedPreparedStatements()) {
response.header(protocolHeaders.responseDeallocatedPrepare(), urlEncode(name));
}

// add new transaction ID
query.getStartedTransactionId()
resultsResponse.startedTransactionId()
.ifPresent(transactionId -> response.header(protocolHeaders.responseStartedTransactionId(), transactionId));

// add clear transaction ID directive
if (query.isClearTransactionId()) {
if (resultsResponse.clearTransactionId()) {
response.header(protocolHeaders.responseClearTransactionId(), true);
}

Expand Down
103 changes: 33 additions & 70 deletions core/trino-main/src/main/java/io/trino/server/protocol/Query.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import io.trino.client.ClientCapabilities;
import io.trino.client.Column;
import io.trino.client.FailureInfo;
import io.trino.client.ProtocolHeaders;
import io.trino.client.QueryError;
import io.trino.client.QueryResults;
import io.trino.exchange.ExchangeDataSource;
Expand Down Expand Up @@ -273,76 +272,23 @@ public QueryInfo getQueryInfo()
return queryManager.getFullQueryInfo(queryId);
}

public ProtocolHeaders getProtocolHeaders()
public ListenableFuture<QueryResultsResponse> waitForResults(long token, UriInfo uriInfo, Duration wait, DataSize targetResultSize)
{
return session.getProtocolHeaders();
}

public synchronized Optional<String> getSetCatalog()
{
return setCatalog;
}

public synchronized Optional<String> getSetSchema()
{
return setSchema;
}

public synchronized Optional<String> getSetPath()
{
return setPath;
}

public synchronized Map<String, String> getSetSessionProperties()
{
return setSessionProperties;
}

public synchronized Set<String> getResetSessionProperties()
{
return resetSessionProperties;
}

public synchronized Map<String, SelectedRole> getSetRoles()
{
return setRoles;
}

public synchronized Map<String, String> getAddedPreparedStatements()
{
return addedPreparedStatements;
}

public synchronized Set<String> getDeallocatedPreparedStatements()
{
return deallocatedPreparedStatements;
}

public synchronized Optional<TransactionId> getStartedTransactionId()
{
return startedTransactionId;
}

public synchronized boolean isClearTransactionId()
{
return clearTransactionId;
}

public synchronized ListenableFuture<QueryResults> waitForResults(long token, UriInfo uriInfo, Duration wait, DataSize targetResultSize)
{
// before waiting, check if this request has already been processed and cached
Optional<QueryResults> cachedResult = getCachedResult(token);
if (cachedResult.isPresent()) {
return immediateFuture(cachedResult.get());
ListenableFuture<Void> futureStateChange;
synchronized (this) {
// before waiting, check if this request has already been processed and cached
Optional<QueryResults> cachedResult = getCachedResult(token);
if (cachedResult.isPresent()) {
return immediateFuture(toResultsResponse(cachedResult.get()));
}
// release the lock eagerly after acquiring the future to avoid contending with callback threads
futureStateChange = getFutureStateChange();
}

// wait for a results data or query to finish, up to the wait timeout
ListenableFuture<Void> futureStateChange = addTimeout(
getFutureStateChange(),
() -> null,
wait,
timeoutExecutor);

if (!futureStateChange.isDone()) {
futureStateChange = addTimeout(futureStateChange, () -> null, wait, timeoutExecutor);
}
// when state changes, fetch the next result
return Futures.transform(futureStateChange, ignored -> getNextResult(token, uriInfo, targetResultSize), resultsProcessorExecutor);
}
Expand Down Expand Up @@ -447,12 +393,12 @@ private synchronized Optional<QueryResults> getCachedResult(long token)
return Optional.empty();
}

private synchronized QueryResults getNextResult(long token, UriInfo uriInfo, DataSize targetResultSize)
private synchronized QueryResultsResponse getNextResult(long token, UriInfo uriInfo, DataSize targetResultSize)
{
// check if the result for the token have already been created
Optional<QueryResults> cachedResult = getCachedResult(token);
if (cachedResult.isPresent()) {
return cachedResult.get();
return toResultsResponse(cachedResult.get());
}

verify(nextToken.isPresent(), "Cannot generate next result when next token is not present");
Expand Down Expand Up @@ -551,7 +497,24 @@ private synchronized QueryResults getNextResult(long token, UriInfo uriInfo, Dat
lastToken = token;
lastResult = queryResults;

return queryResults;
return toResultsResponse(queryResults);
}

private synchronized QueryResultsResponse toResultsResponse(QueryResults queryResults)
{
return new QueryResultsResponse(
setCatalog,
setSchema,
setPath,
setSessionProperties,
resetSessionProperties,
setRoles,
addedPreparedStatements,
deallocatedPreparedStatements,
startedTransactionId,
clearTransactionId,
session.getProtocolHeaders(),
queryResults);
}

private synchronized QueryResultRows removePagesFromExchange(QueryInfo queryInfo, long targetResultBytes)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.server.protocol;

import io.trino.client.ProtocolHeaders;
import io.trino.client.QueryResults;
import io.trino.spi.security.SelectedRole;
import io.trino.transaction.TransactionId;

import java.util.Map;
import java.util.Optional;
import java.util.Set;

import static java.util.Objects.requireNonNull;

record QueryResultsResponse(
Optional<String> setCatalog,
Optional<String> setSchema,
Optional<String> setPath,
Map<String, String> setSessionProperties,
Set<String> resetSessionProperties,
Map<String, SelectedRole> setRoles,
Map<String, String> addedPreparedStatements,
Set<String> deallocatedPreparedStatements,
Optional<TransactionId> startedTransactionId,
boolean clearTransactionId,
ProtocolHeaders protocolHeaders,
QueryResults queryResults)
{
QueryResultsResponse {
requireNonNull(setCatalog, "setCatalog is null");
requireNonNull(setSchema, "setSchema is null");
requireNonNull(setPath, "setPath is null");
requireNonNull(setSessionProperties, "setSessionProperties is null");
requireNonNull(resetSessionProperties, "resetSessionProperties is null");
requireNonNull(setRoles, "setRoles is null");
requireNonNull(addedPreparedStatements, "addedPreparedStatements is null");
requireNonNull(deallocatedPreparedStatements, "deallocatedPreparedStatements is null");
requireNonNull(startedTransactionId, "startedTransactionId is null");
requireNonNull(protocolHeaders, "protocolHeaders is null");
requireNonNull(queryResults, "queryResults is null");
}
}