Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -127,9 +127,6 @@ public static class PerQueryCPUMemResourceUsageAccountant implements ThreadResou
// track memory usage
protected final boolean _isThreadMemorySamplingEnabled;

// is sampling allowed for MSE queries
protected final boolean _isThreadSamplingEnabledForMSE;

protected final Set<String> _inactiveQuery;

protected Set<String> _cancelSentQueries;
Expand All @@ -148,7 +145,6 @@ protected PerQueryCPUMemResourceUsageAccountant(PinotConfiguration config, boole
_config = config;
_isThreadCPUSamplingEnabled = isThreadCPUSamplingEnabled;
_isThreadMemorySamplingEnabled = isThreadMemorySamplingEnabled;
_isThreadSamplingEnabledForMSE = isThreadSamplingEnabledForMSE;
_inactiveQuery = inactiveQuery;
_instanceId = instanceId;
_instanceType = instanceType;
Expand Down Expand Up @@ -184,11 +180,6 @@ public PerQueryCPUMemResourceUsageAccountant(PinotConfiguration config, String i
LOGGER.info("_isThreadCPUSamplingEnabled: {}, _isThreadMemorySamplingEnabled: {}", _isThreadCPUSamplingEnabled,
_isThreadMemorySamplingEnabled);

_isThreadSamplingEnabledForMSE =
config.getProperty(CommonConstants.Accounting.CONFIG_OF_ENABLE_THREAD_SAMPLING_MSE,
CommonConstants.Accounting.DEFAULT_ENABLE_THREAD_SAMPLING_MSE);
LOGGER.info("_isThreadSamplingEnabledForMSE: {}", _isThreadSamplingEnabledForMSE);

_queryCancelCallbacks = CacheBuilder.newBuilder().maximumSize(
config.getProperty(CommonConstants.Accounting.CONFIG_OF_CANCEL_CALLBACK_CACHE_MAX_SIZE,
CommonConstants.Accounting.DEFAULT_CANCEL_CALLBACK_CACHE_MAX_SIZE)).expireAfterWrite(
Expand Down Expand Up @@ -274,17 +265,6 @@ public void sampleUsage() {
sampleThreadCPUTime();
}

/**
* Sample Usage for Multi-stage engine queries
*/
@Override
public void sampleUsageMSE() {
if (_isThreadSamplingEnabledForMSE) {
sampleThreadBytesAllocated();
sampleThreadCPUTime();
}
}

@Override
public boolean throttleQuerySubmission() {
return getWatcherTask().getHeapUsageBytes() > getWatcherTask().getQueryMonitorConfig().getAlarmingLevel();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,6 @@ public static class ResourceUsageAccountant implements ThreadResourceUsageAccoun
// track memory usage
private final boolean _isThreadMemorySamplingEnabled;

// is sampling allowed for MSE queries
private final boolean _isThreadSamplingEnabledForMSE;

private final WatcherTask _watcherTask;

private final EnumMap<TrackingScope, ResourceAggregator> _resourceAggregators;
Expand All @@ -109,11 +106,6 @@ public ResourceUsageAccountant(PinotConfiguration config, String instanceId, Ins
LOGGER.info("_isThreadCPUSamplingEnabled: {}, _isThreadMemorySamplingEnabled: {}", _isThreadCPUSamplingEnabled,
_isThreadMemorySamplingEnabled);

_isThreadSamplingEnabledForMSE =
config.getProperty(CommonConstants.Accounting.CONFIG_OF_ENABLE_THREAD_SAMPLING_MSE,
CommonConstants.Accounting.DEFAULT_ENABLE_THREAD_SAMPLING_MSE);
LOGGER.info("_isThreadSamplingEnabledForMSE: {}", _isThreadSamplingEnabledForMSE);

_watcherTask = new WatcherTask();

_resourceAggregators = new EnumMap<>(TrackingScope.class);
Expand All @@ -138,14 +130,6 @@ public void sampleUsage() {
sampleThreadCPUTime();
}

@Override
public void sampleUsageMSE() {
if (_isThreadSamplingEnabledForMSE) {
sampleThreadBytesAllocated();
sampleThreadCPUTime();
}
}

@Override
public boolean isAnchorThreadInterrupted() {
ThreadExecutionContext context = _threadLocalEntry.get().getCurrentThreadTaskStatus();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ protected void sampleAndCheckInterruption(long deadlineMs) {
earlyTerminate();
throw QueryErrorCode.EXECUTION_TIMEOUT.asException("Timing out on " + getExplainName());
}
Tracing.ThreadAccountantOps.sampleMSE();
Tracing.ThreadAccountantOps.sample();
if (Tracing.ThreadAccountantOps.isInterrupted()) {
earlyTerminate();
throw QueryErrorCode.SERVER_RESOURCE_LIMIT_EXCEEDED.asException("Resource limit exceeded for operator: "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public static void setUpClass() {
configs.put(CommonConstants.Accounting.CONFIG_OF_OOM_PROTECTION_KILLING_QUERY, true);
// init accountant and start watcher task
Tracing.unregisterThreadAccountant();
Tracing.ThreadAccountantOps.initializeThreadAccountant(new PinotConfiguration(configs), "testGroupBy",
Tracing.ThreadAccountantOps.createThreadAccountant(new PinotConfiguration(configs), "testGroupBy",
InstanceType.SERVER);
Tracing.ThreadAccountantOps.startThreadAccountant();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public static void setUpClass() {
// init accountant and start watcher task
PinotConfiguration pinotCfg = new PinotConfiguration(configs);
Tracing.unregisterThreadAccountant();
Tracing.ThreadAccountantOps.initializeThreadAccountant(pinotCfg, "testGroupBy", InstanceType.SERVER);
Tracing.ThreadAccountantOps.createThreadAccountant(pinotCfg, "testGroupBy", InstanceType.SERVER);
Tracing.ThreadAccountantOps.startThreadAccountant();

// Setup Thread Context
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import java.util.concurrent.Executors;
import org.apache.pinot.common.response.broker.ResultTable;
import org.apache.pinot.core.accounting.PerQueryCPUMemAccountantFactory;
import org.apache.pinot.core.accounting.ResourceUsageAccountantFactory;
import org.apache.pinot.query.planner.physical.DispatchableSubPlan;
import org.apache.pinot.spi.accounting.QueryResourceTracker;
import org.apache.pinot.spi.accounting.ThreadResourceUsageAccountant;
Expand Down Expand Up @@ -98,48 +97,6 @@ void testWithPerQueryAccountantFactory() {
}
}

@Test
void testDisableSamplingForMSE() {
HashMap<String, Object> configs = getAccountingConfig();
configs.put(CommonConstants.Accounting.CONFIG_OF_ENABLE_THREAD_SAMPLING_MSE, false);

ThreadResourceUsageProvider.setThreadMemoryMeasurementEnabled(true);
PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant accountant =
new PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant(new PinotConfiguration(configs),
"testWithPerQueryAccountantFactory", InstanceType.SERVER);

try (MockedStatic<Tracing> tracing = Mockito.mockStatic(Tracing.class, Mockito.CALLS_REAL_METHODS)) {
tracing.when(Tracing::getThreadAccountant).thenReturn(accountant);
ResultTable resultTable = queryRunner("SELECT * FROM a LIMIT 2", false).getResultTable();
Assert.assertEquals(resultTable.getRows().size(), 2);

Map<String, ? extends QueryResourceTracker> resources = accountant.getQueryResources();
Assert.assertEquals(resources.size(), 1);
Assert.assertEquals(resources.entrySet().iterator().next().getValue().getAllocatedBytes(), 0);
}
}

@Test
void testDisableSamplingWithResourceUsageAccountantForMSE() {
HashMap<String, Object> configs = getAccountingConfig();
configs.put(CommonConstants.Accounting.CONFIG_OF_ENABLE_THREAD_SAMPLING_MSE, false);

ThreadResourceUsageProvider.setThreadMemoryMeasurementEnabled(true);
ResourceUsageAccountantFactory.ResourceUsageAccountant accountant =
new ResourceUsageAccountantFactory.ResourceUsageAccountant(new PinotConfiguration(configs),
"testWithPerQueryAccountantFactory", InstanceType.SERVER);

try (MockedStatic<Tracing> tracing = Mockito.mockStatic(Tracing.class, Mockito.CALLS_REAL_METHODS)) {
tracing.when(Tracing::getThreadAccountant).thenReturn(accountant);
ResultTable resultTable = queryRunner("SELECT * FROM a LIMIT 2", false).getResultTable();
Assert.assertEquals(resultTable.getRows().size(), 2);

Map<String, ? extends QueryResourceTracker> resources = accountant.getQueryResources();
Assert.assertEquals(resources.size(), 1);
Assert.assertEquals(resources.entrySet().iterator().next().getValue().getAllocatedBytes(), 0);
}
}

public static class InterruptingAccountant
extends PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,6 @@ void setupWorker(int taskId, ThreadExecutionContext.TaskType taskType,
*/
void sampleUsage();

/**
* Sample Usage for Multi-stage engine queries
*/
void sampleUsageMSE();

default boolean throttleQuerySubmission() {
return false;
}
Expand Down
13 changes: 0 additions & 13 deletions pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracing.java
Original file line number Diff line number Diff line change
Expand Up @@ -206,10 +206,6 @@ public void clear() {
public void sampleUsage() {
}

@Override
public void sampleUsageMSE() {
}

@Override
public void updateQueryUsageConcurrently(String queryId, long cpuTimeNs, long allocatedBytes,
TrackingScope trackingScope) {
Expand Down Expand Up @@ -295,19 +291,10 @@ public static void sample() {
Tracing.getThreadAccountant().sampleUsage();
}

public static void sampleMSE() {
Tracing.getThreadAccountant().sampleUsageMSE();
}

public static void clear() {
Tracing.getThreadAccountant().clear();
}

public static void initializeThreadAccountant(PinotConfiguration config, String instanceId,
InstanceType instanceType) {
createThreadAccountant(config, instanceId, instanceType);
}

public static ThreadResourceUsageAccountant createThreadAccountant(PinotConfiguration config, String instanceId,
InstanceType instanceType) {
_workloadBudgetManager = new WorkloadBudgetManager(config);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1538,9 +1538,6 @@ public static class Accounting {
public static final String CONFIG_OF_QUERY_KILLED_METRIC_ENABLED = "accounting.query.killed.metric.enabled";
public static final boolean DEFAULT_QUERY_KILLED_METRIC_ENABLED = false;

public static final String CONFIG_OF_ENABLE_THREAD_SAMPLING_MSE = "accounting.enable.thread.sampling.mse.debug";
public static final Boolean DEFAULT_ENABLE_THREAD_SAMPLING_MSE = true;

public static final String CONFIG_OF_CANCEL_CALLBACK_CACHE_MAX_SIZE = "accounting.cancel.callback.cache.max.size";
public static final int DEFAULT_CANCEL_CALLBACK_CACHE_MAX_SIZE = 500;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,6 @@ public ThreadExecutionContext getThreadExecutionContext() {
public void sampleUsage() {
}

@Override
public void sampleUsageMSE() {
}

@Override
public boolean throttleQuerySubmission() {
return _numCalls.getAndIncrement() > 1;
Expand Down
Loading