Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package org.apache.pinot.core.accounting;

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import java.lang.management.ManagementFactory;
import java.lang.management.MemoryMXBean;
import java.util.Collection;
Expand All @@ -31,6 +33,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
Expand All @@ -41,6 +44,7 @@
import org.apache.pinot.common.metrics.ServerGauge;
import org.apache.pinot.common.metrics.ServerMeter;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.spi.accounting.MseCancelCallback;
import org.apache.pinot.spi.accounting.QueryResourceTracker;
import org.apache.pinot.spi.accounting.ThreadAccountantFactory;
import org.apache.pinot.spi.accounting.ThreadExecutionContext;
Expand Down Expand Up @@ -111,6 +115,8 @@ public static class PerQueryCPUMemResourceUsageAccountant implements ThreadResou
protected final HashMap<String, Long> _finishedTaskCPUStatsAggregator = new HashMap<>();
protected final HashMap<String, Long> _finishedTaskMemStatsAggregator = new HashMap<>();

Cache<String, MseCancelCallback> _queryCancelCallbacks;

protected final ThreadLocal<CPUMemThreadLevelAccountingObjects.ThreadEntry> _threadLocalEntry
= ThreadLocal.withInitial(() -> {
CPUMemThreadLevelAccountingObjects.ThreadEntry ret =
Expand Down Expand Up @@ -188,6 +194,14 @@ public PerQueryCPUMemResourceUsageAccountant(PinotConfiguration config, String i
CommonConstants.Accounting.DEFAULT_ENABLE_THREAD_SAMPLING_MSE);
LOGGER.info("_isThreadSamplingEnabledForMSE: {}", _isThreadSamplingEnabledForMSE);

_queryCancelCallbacks = CacheBuilder.newBuilder().maximumSize(
config.getProperty(CommonConstants.Accounting.CONFIG_OF_CANCEL_CALLBACK_CACHE_MAX_SIZE,
CommonConstants.Accounting.DEFAULT_CANCEL_CALLBACK_CACHE_MAX_SIZE)).expireAfterWrite(
config.getProperty(CommonConstants.Accounting.CONFIG_OF_CANCEL_CALLBACK_CACHE_EXPIRY_SECONDS,
CommonConstants.Accounting.DEFAULT_CANCEL_CALLBACK_CACHE_EXPIRY_SECONDS),
TimeUnit.SECONDS) // backstop for stuck entries
.build();

// task/query tracking
_inactiveQuery = new HashSet<>();
_cancelSentQueries = new HashSet<>();
Expand Down Expand Up @@ -277,6 +291,16 @@ public boolean throttleQuerySubmission() {
return getWatcherTask().getHeapUsageBytes() > getWatcherTask().getQueryMonitorConfig().getAlarmingLevel();
}

@Override
public void registerMseCancelCallback(String queryId, MseCancelCallback callback) {
_queryCancelCallbacks.put(queryId, callback);
}

@Nullable
public MseCancelCallback getQueryCancelCallback(String queryId) {
return _queryCancelCallbacks.getIfPresent(queryId);
}

@Override
public boolean isAnchorThreadInterrupted() {
ThreadExecutionContext context = _threadLocalEntry.get().getCurrentThreadTaskStatus();
Expand Down Expand Up @@ -428,6 +452,7 @@ public void cleanInactive() {
_concurrentTaskMemStatsAggregator.remove(inactiveQueryId);
}
_cancelSentQueries.remove(inactiveQueryId);
_queryCancelCallbacks.invalidate(inactiveQueryId);
}
_inactiveQuery.clear();
if (_isThreadCPUSamplingEnabled) {
Expand Down Expand Up @@ -528,9 +553,15 @@ protected void logQueryResourceUsage(Map<String, ? extends QueryResourceTracker>
LOGGER.warn("Query aggregation results {} for the previous kill.", aggregatedUsagePerActiveQuery);
}

protected void cancelQuery(AggregatedStats queryResourceTracker) {
_cancelSentQueries.add(queryResourceTracker.getQueryId());
queryResourceTracker.getAnchorThread().interrupt();
public void cancelQuery(String queryId, Thread anchorThread) {
MseCancelCallback callback = _queryCancelCallbacks.getIfPresent(queryId);
if (callback != null) {
callback.cancelQuery(Long.parseLong(queryId));
_queryCancelCallbacks.invalidate(queryId);
} else {
anchorThread.interrupt();
}
_cancelSentQueries.add(queryId);
}

protected void logTerminatedQuery(QueryResourceTracker queryResourceTracker, long totalHeapMemoryUsage) {
Expand Down Expand Up @@ -865,10 +896,8 @@ void killAllQueries() {
for (Map.Entry<Thread, CPUMemThreadLevelAccountingObjects.ThreadEntry> entry : _threadEntriesMap.entrySet()) {
CPUMemThreadLevelAccountingObjects.ThreadEntry threadEntry = entry.getValue();
CPUMemThreadLevelAccountingObjects.TaskEntry taskEntry = threadEntry.getCurrentThreadTaskStatus();
if (taskEntry != null && taskEntry.isAnchorThread()) {
threadEntry._errorStatus
.set(new RuntimeException(String.format("Query killed due to %s out of memory!", _instanceType)));
taskEntry.getAnchorThread().interrupt();
if (taskEntry != null && !_cancelSentQueries.contains(taskEntry.getQueryId())) {
cancelQuery(taskEntry.getQueryId(), taskEntry.getAnchorThread());
killedCount += 1;
}
}
Expand Down Expand Up @@ -924,7 +953,7 @@ private void killMostExpensiveQuery() {
maxUsageTuple._exceptionAtomicReference.set(new RuntimeException(
String.format(" Query %s got killed because using %d bytes of memory on %s: %s, exceeding the quota",
maxUsageTuple._queryId, maxUsageTuple.getAllocatedBytes(), _instanceType, _instanceId)));
interruptRunnerThread(maxUsageTuple);
terminateQuery(maxUsageTuple);
logTerminatedQuery(maxUsageTuple, _usedBytes);
} else if (!config.isOomKillQueryEnabled()) {
LOGGER.warn("Query {} got picked because using {} bytes of memory, actual kill committed false "
Expand Down Expand Up @@ -952,15 +981,15 @@ private void killCPUTimeExceedQueries() {
String.format("Query %s got killed on %s: %s because using %d "
+ "CPU time exceeding limit of %d ns CPU time", value._queryId, _instanceType, _instanceId,
value.getCpuTimeNs(), config.getCpuTimeBasedKillingThresholdNS())));
cancelQuery(value);
terminateQuery(value);
logTerminatedQuery(value, _usedBytes);
}
}
logQueryResourceUsage(_aggregatedUsagePerActiveQuery);
}

private void interruptRunnerThread(AggregatedStats queryResourceTracker) {
cancelQuery(queryResourceTracker);
private void terminateQuery(AggregatedStats queryResourceTracker) {
cancelQuery(queryResourceTracker.getQueryId(), queryResourceTracker.getAnchorThread());
if (_queryMonitorConfig.get().isQueryKilledMetricEnabled()) {
_metrics.addMeteredGlobalValue(_queryKilledMeter, 1);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,9 @@ public WatcherTask createWatcherTask() {
}

@Override
public void cancelQuery(AggregatedStats queryResourceTracker) {
_cancelSentQueries.add(queryResourceTracker.getQueryId());
_cancelLog.add(queryResourceTracker.getQueryId());
public void cancelQuery(String queryId, Thread anchorThread) {
_cancelSentQueries.add(queryId);
_cancelLog.add(queryId);
}

public List<String> getCancelLog() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
import org.apache.pinot.query.runtime.timeseries.PhysicalTimeSeriesServerPlanVisitor;
import org.apache.pinot.query.runtime.timeseries.TimeSeriesExecutionContext;
import org.apache.pinot.query.runtime.timeseries.serde.TimeSeriesBlockSerde;
import org.apache.pinot.spi.accounting.MseCancelCallback;
import org.apache.pinot.spi.accounting.ThreadExecutionContext;
import org.apache.pinot.spi.accounting.ThreadResourceUsageAccountant;
import org.apache.pinot.spi.env.PinotConfiguration;
Expand Down Expand Up @@ -141,6 +142,10 @@ public class QueryRunner {
@Nullable
private PhysicalTimeSeriesServerPlanVisitor _timeSeriesPhysicalPlanVisitor;
private BooleanSupplier _sendStats;
private ThreadResourceUsageAccountant _resourceUsageAccountant;
private final MseCancelCallback _mseCancelCallback = (requestId -> {
_opChainScheduler.cancel(requestId);
});

/**
* Initializes the query executor.
Expand Down Expand Up @@ -247,6 +252,7 @@ public void init(PinotConfiguration serverConf, InstanceDataManager instanceData

_sendStats = sendStats;

_resourceUsageAccountant = resourceUsageAccountant;
LOGGER.info("Initialized QueryRunner with hostname: {}, port: {}", hostname, port);
}

Expand All @@ -267,6 +273,9 @@ public void shutDown() {
/// If any error happened during the asynchronous execution, an error block will be sent to all receiver mailboxes.
public CompletableFuture<Void> processQuery(WorkerMetadata workerMetadata, StagePlan stagePlan,
Map<String, String> requestMetadata, @Nullable ThreadExecutionContext parentContext) {
String requestIdStr = Long.toString(QueryThreadContext.getRequestId());
_resourceUsageAccountant.registerMseCancelCallback(requestIdStr, _mseCancelCallback);

Runnable runnable = () -> processQueryBlocking(workerMetadata, stagePlan, requestMetadata, parentContext);
return CompletableFuture.runAsync(runnable, _executorService);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.pinot.query.testutils.MockInstanceDataManagerFactory;
import org.apache.pinot.query.testutils.QueryTestUtils;
import org.apache.pinot.spi.accounting.ThreadExecutionContext;
import org.apache.pinot.spi.accounting.ThreadResourceUsageAccountant;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.query.QueryThreadContext;
import org.apache.pinot.spi.trace.Tracing;
Expand Down Expand Up @@ -56,14 +57,19 @@ public QueryServerEnclosure(MockInstanceDataManagerFactory factory) {
}

public QueryServerEnclosure(MockInstanceDataManagerFactory factory, Map<String, Object> config) {
this(factory, config, new Tracing.DefaultThreadResourceUsageAccountant());
}

public QueryServerEnclosure(MockInstanceDataManagerFactory factory, Map<String, Object> config,
ThreadResourceUsageAccountant accountant) {
_queryRunnerPort = QueryTestUtils.getAvailablePort();
Map<String, Object> runnerConfig = new HashMap<>(config);
runnerConfig.put(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_HOSTNAME, "Server_localhost");
runnerConfig.put(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_PORT, _queryRunnerPort);
InstanceDataManager instanceDataManager = factory.buildInstanceDataManager();
_queryRunner = new QueryRunner();
_queryRunner.init(new PinotConfiguration(runnerConfig), instanceDataManager, null, () -> true,
new Tracing.DefaultThreadResourceUsageAccountant());
accountant);
}

public int getPort() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.query.runtime.queries;

import org.apache.pinot.common.response.broker.ResultTable;
import org.apache.pinot.spi.accounting.ThreadResourceUsageAccountant;
import org.apache.pinot.spi.trace.Tracing;
import org.mockito.MockedStatic;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.Test;


public class DefaultAccountantTest extends QueryRunnerAccountingTest {
@Override
protected ThreadResourceUsageAccountant getThreadResourceUsageAccountant() {
return new Tracing.DefaultThreadResourceUsageAccountant();
}

@Test
void testWithDefaultThreadAccountant() {
Tracing.DefaultThreadResourceUsageAccountant accountant = new Tracing.DefaultThreadResourceUsageAccountant();
try (MockedStatic<Tracing> tracing = Mockito.mockStatic(Tracing.class, Mockito.CALLS_REAL_METHODS)) {
tracing.when(Tracing::getThreadAccountant).thenReturn(accountant);

ResultTable resultTable = queryRunner("SELECT * FROM a LIMIT 2", false).getResultTable();
Assert.assertEquals(resultTable.getRows().size(), 2);

ThreadResourceUsageAccountant threadAccountant = Tracing.getThreadAccountant();
Assert.assertTrue(threadAccountant.getThreadResources().isEmpty());
Assert.assertTrue(threadAccountant.getQueryResources().isEmpty());
}
}
}
Loading
Loading