Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndO
return doHandleRequest(requestId, query, sqlNodeAndOptions, request, requesterIdentity, requestContext,
httpHeaders, accessControl);
} finally {
Tracing.ThreadAccountantOps.clear();
_resourceUsageAccountant.clear();
}
}

Expand Down Expand Up @@ -401,7 +401,7 @@ protected BrokerResponse doHandleRequest(long requestId, String query, SqlNodeAn
_brokerMetrics.addPhaseTiming(rawTableName, BrokerQueryPhase.REQUEST_COMPILATION,
(compilationEndTimeNs - compilationStartTimeNs) + sqlNodeAndOptions.getParseTimeNs());
// Accounts for resource usage of the compilation phase, since compilation for some queries can be expensive.
Tracing.ThreadAccountantOps.sampleAndCheckInterruption();
Tracing.ThreadAccountantOps.sampleAndCheckInterruption(_resourceUsageAccountant);

// Second-stage table-level access control
// TODO: Modify AccessControl interface to directly take PinotQuery
Expand Down Expand Up @@ -442,7 +442,7 @@ protected BrokerResponse doHandleRequest(long requestId, String query, SqlNodeAn
_brokerMetrics.addPhaseTiming(rawTableName, BrokerQueryPhase.AUTHORIZATION,
System.nanoTime() - compilationEndTimeNs);
// Accounts for resource usage of the authorization phase.
Tracing.ThreadAccountantOps.sampleAndCheckInterruption();
Tracing.ThreadAccountantOps.sampleAndCheckInterruption(_resourceUsageAccountant);

if (!authorizationResult.hasAccess()) {
throwAccessDeniedError(requestId, query, requestContext, tableName, authorizationResult);
Expand Down Expand Up @@ -693,7 +693,7 @@ protected BrokerResponse doHandleRequest(long requestId, String query, SqlNodeAn
routingEndTimeNs - routingStartTimeNs);
// Account the resource used for routing phase, since for single stage queries with multiple segments, routing
// can be expensive.
Tracing.ThreadAccountantOps.sampleAndCheckInterruption();
Tracing.ThreadAccountantOps.sampleAndCheckInterruption(_resourceUsageAccountant);

// Set timeout in the requests
long timeSpentMs = TimeUnit.NANOSECONDS.toMillis(routingEndTimeNs - compilationStartTimeNs);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,13 @@ public static void sampleAndCheckInterruption() {
sample();
}

public static void sampleAndCheckInterruption(ThreadResourceUsageAccountant accountant) {
if (Thread.interrupted() || accountant.isAnchorThreadInterrupted() || accountant.isQueryTerminated()) {
throw new EarlyTerminationException("Interrupted while merging records");
}
accountant.sampleUsage();
}

@Deprecated
public static void updateQueryUsageConcurrently(String queryId) {
}
Expand Down
Loading