Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ protected PerQueryCPUMemResourceUsageAccountant(PinotConfiguration config, boole
_instanceType = instanceType;
_cancelSentQueries = new HashSet<>();
_watcherTask = createWatcherTask();
_queryCancelCallbacks = CacheBuilder.newBuilder().build();
}

public PerQueryCPUMemResourceUsageAccountant(PinotConfiguration config, String instanceId,
Expand Down Expand Up @@ -564,10 +565,11 @@ public void cancelQuery(String queryId, Thread anchorThread) {
_cancelSentQueries.add(queryId);
}

protected void logTerminatedQuery(QueryResourceTracker queryResourceTracker, long totalHeapMemoryUsage) {
LOGGER.warn("Query {} terminated. Memory Usage: {}. Cpu Usage: {}. Total Heap Usage: {}",
protected void logTerminatedQuery(QueryResourceTracker queryResourceTracker, long totalHeapMemoryUsage,
boolean hasCallback) {
LOGGER.warn("Query {} terminated. Memory Usage: {}. Cpu Usage: {}. Total Heap Usage: {}. Used Callback: {}",
queryResourceTracker.getQueryId(), queryResourceTracker.getAllocatedBytes(),
queryResourceTracker.getCpuTimeNs(), totalHeapMemoryUsage);
queryResourceTracker.getCpuTimeNs(), totalHeapMemoryUsage, hasCallback);
}

@Override
Expand Down Expand Up @@ -661,7 +663,6 @@ public class WatcherTask implements Runnable, PinotClusterConfigChangeListener {

protected long _usedBytes;
protected int _sleepTime;
protected int _numQueriesKilledConsecutively = 0;
protected Map<String, AggregatedStats> _aggregatedUsagePerActiveQuery;
protected TriggeringLevel _triggeringLevel;

Expand Down Expand Up @@ -744,10 +745,7 @@ private void logQueryMonitorConfig() {
LOGGER.info("_instanceType is {}", _instanceType);
LOGGER.info("_alarmingLevel of on heap memory is {}", queryMonitorConfig.getAlarmingLevel());
LOGGER.info("_criticalLevel of on heap memory is {}", queryMonitorConfig.getCriticalLevel());
LOGGER.info("_criticalLevelAfterGC of on heap memory is {}", queryMonitorConfig.getCriticalLevelAfterGC());
LOGGER.info("_panicLevel of on heap memory is {}", queryMonitorConfig.getPanicLevel());
LOGGER.info("_gcBackoffCount is {}", queryMonitorConfig.getGcBackoffCount());
LOGGER.info("_gcWaitTime is {}", queryMonitorConfig.getGcWaitTime());
LOGGER.info("_normalSleepTime is {}", queryMonitorConfig.getNormalSleepTime());
LOGGER.info("_alarmingSleepTime is {}", queryMonitorConfig.getAlarmingSleepTime());
LOGGER.info("_oomKillQueryEnabled: {}", queryMonitorConfig.isOomKillQueryEnabled());
Expand Down Expand Up @@ -873,10 +871,8 @@ protected void triggeredActions() {
case HeapMemoryAlarmingVerbose:
LOGGER.warn("Heap used bytes {} exceeds alarming level", _usedBytes);
LOGGER.warn("Query usage aggregation results {}", _aggregatedUsagePerActiveQuery.toString());
_numQueriesKilledConsecutively = 0;
break;
default:
_numQueriesKilledConsecutively = 0;
break;
}
}
Expand Down Expand Up @@ -904,13 +900,6 @@ void killAllQueries() {
if (config.isQueryKilledMetricEnabled()) {
_metrics.addMeteredGlobalValue(_queryKilledMeter, killedCount);
}
try {
Thread.sleep(config.getNormalSleepTime());
} catch (InterruptedException ignored) {
}
// In this extreme case we directly trigger system.gc
System.gc();
_numQueriesKilledConsecutively = 0;
}
}

Expand All @@ -925,21 +914,6 @@ private void killMostExpensiveQuery() {
return;
}
QueryMonitorConfig config = _queryMonitorConfig.get();
if (_aggregatedUsagePerActiveQuery != null && !_aggregatedUsagePerActiveQuery.isEmpty()
&& _numQueriesKilledConsecutively >= config.getGcBackoffCount()) {
_numQueriesKilledConsecutively = 0;
System.gc();
try {
Thread.sleep(config.getGcWaitTime());
} catch (InterruptedException ignored) {
}
_usedBytes = MEMORY_MX_BEAN.getHeapMemoryUsage().getUsed();
if (_usedBytes < config.getCriticalLevelAfterGC()) {
return;
}
LOGGER.error("After GC, heap used bytes {} still exceeds _criticalLevelAfterGC level {}", _usedBytes,
config.getCriticalLevelAfterGC());
}
// Critical heap memory usage while no queries running
if (_aggregatedUsagePerActiveQuery != null && !_aggregatedUsagePerActiveQuery.isEmpty()) {
AggregatedStats maxUsageTuple;
Expand All @@ -953,8 +927,9 @@ private void killMostExpensiveQuery() {
maxUsageTuple._exceptionAtomicReference.set(new RuntimeException(
String.format(" Query %s got killed because using %d bytes of memory on %s: %s, exceeding the quota",
maxUsageTuple._queryId, maxUsageTuple.getAllocatedBytes(), _instanceType, _instanceId)));
boolean hasCallBack = _queryCancelCallbacks.getIfPresent(maxUsageTuple.getQueryId()) != null;
terminateQuery(maxUsageTuple);
logTerminatedQuery(maxUsageTuple, _usedBytes);
logTerminatedQuery(maxUsageTuple, _usedBytes, hasCallBack);
} else if (!config.isOomKillQueryEnabled()) {
LOGGER.warn("Query {} got picked because using {} bytes of memory, actual kill committed false "
+ "because oomKillQueryEnabled is false", maxUsageTuple._queryId, maxUsageTuple._allocatedBytes);
Expand All @@ -981,8 +956,9 @@ private void killCPUTimeExceedQueries() {
String.format("Query %s got killed on %s: %s because using %d "
+ "CPU time exceeding limit of %d ns CPU time", value._queryId, _instanceType, _instanceId,
value.getCpuTimeNs(), config.getCpuTimeBasedKillingThresholdNS())));
boolean hasCallBack = _queryCancelCallbacks.getIfPresent(value.getQueryId()) != null;
terminateQuery(value);
logTerminatedQuery(value, _usedBytes);
logTerminatedQuery(value, _usedBytes, hasCallBack);
}
}
logQueryResourceUsage(_aggregatedUsagePerActiveQuery);
Expand All @@ -993,7 +969,6 @@ private void terminateQuery(AggregatedStats queryResourceTracker) {
if (_queryMonitorConfig.get().isQueryKilledMetricEnabled()) {
_metrics.addMeteredGlobalValue(_queryKilledMeter, 1);
}
_numQueriesKilledConsecutively += 1;
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,28 +36,12 @@ public class QueryMonitorConfig {
// kill the most expensive query if heap usage exceeds this
private final long _criticalLevel;

// if after gc the heap usage is still above this, kill the most expensive query
// use this to prevent heap size oscillation and repeatedly triggering gc
private final long _criticalLevelAfterGC;

// trigger gc if consecutively kill more than some number of queries
// set this to 0 to always trigger gc before killing a query to give gc a second chance
// as would minimize the chance of false positive killing in some usecases
// should consider use -XX:+ExplicitGCInvokesConcurrent to avoid STW for some gc algorithms
private final int _gcBackoffCount;

// start to sample more frequently if heap usage exceeds this
private final long _alarmingLevel;

// normal sleep time
private final int _normalSleepTime;

// wait for gc to complete, according to system.gc() javadoc, when control returns from the method call,
// the Java Virtual Machine has made a best effort to reclaim space from all discarded objects.
// Therefore, we default this to 0.
// Tested with Shenandoah GC and G1GC, with -XX:+ExplicitGCInvokesConcurrent
private final int _gcWaitTime;

// alarming sleep time denominator, should be > 1 to sample more frequent at alarming level
private final int _alarmingSleepTimeDenominator;

Expand Down Expand Up @@ -94,23 +78,13 @@ public QueryMonitorConfig(PinotConfiguration config, long maxHeapSize) {
(long) (maxHeapSize * config.getProperty(CommonConstants.Accounting.CONFIG_OF_CRITICAL_LEVEL_HEAP_USAGE_RATIO,
CommonConstants.Accounting.DEFAULT_CRITICAL_LEVEL_HEAP_USAGE_RATIO));

_criticalLevelAfterGC = _criticalLevel - (long) (maxHeapSize * config.getProperty(
CommonConstants.Accounting.CONFIG_OF_CRITICAL_LEVEL_HEAP_USAGE_RATIO_DELTA_AFTER_GC,
CommonConstants.Accounting.DEFAULT_CONFIG_OF_CRITICAL_LEVEL_HEAP_USAGE_RATIO_DELTA_AFTER_GC));

_gcBackoffCount = config.getProperty(CommonConstants.Accounting.CONFIG_OF_GC_BACKOFF_COUNT,
CommonConstants.Accounting.DEFAULT_GC_BACKOFF_COUNT);

_alarmingLevel =
(long) (maxHeapSize * config.getProperty(CommonConstants.Accounting.CONFIG_OF_ALARMING_LEVEL_HEAP_USAGE_RATIO,
CommonConstants.Accounting.DEFAULT_ALARMING_LEVEL_HEAP_USAGE_RATIO));

_normalSleepTime = config.getProperty(CommonConstants.Accounting.CONFIG_OF_SLEEP_TIME_MS,
CommonConstants.Accounting.DEFAULT_SLEEP_TIME_MS);

_gcWaitTime = config.getProperty(CommonConstants.Accounting.CONFIG_OF_GC_WAIT_TIME_MS,
CommonConstants.Accounting.DEFAULT_CONFIG_OF_GC_WAIT_TIME_MS);

_alarmingSleepTimeDenominator = config.getProperty(CommonConstants.Accounting.CONFIG_OF_SLEEP_TIME_DENOMINATOR,
CommonConstants.Accounting.DEFAULT_SLEEP_TIME_DENOMINATOR);

Expand Down Expand Up @@ -174,30 +148,6 @@ public QueryMonitorConfig(PinotConfiguration config, long maxHeapSize) {
_criticalLevel = oldConfig._criticalLevel;
}

if (changedConfigs.contains(CommonConstants.Accounting.CONFIG_OF_CRITICAL_LEVEL_HEAP_USAGE_RATIO_DELTA_AFTER_GC)) {
if (clusterConfigs == null || !clusterConfigs.containsKey(
CommonConstants.Accounting.CONFIG_OF_CRITICAL_LEVEL_HEAP_USAGE_RATIO_DELTA_AFTER_GC)) {
_criticalLevelAfterGC = _criticalLevel - (long) (_maxHeapSize
* CommonConstants.Accounting.DEFAULT_CONFIG_OF_CRITICAL_LEVEL_HEAP_USAGE_RATIO_DELTA_AFTER_GC);
} else {
_criticalLevelAfterGC = _criticalLevel - (long) (_maxHeapSize * Double.parseDouble(
clusterConfigs.get(CommonConstants.Accounting.CONFIG_OF_CRITICAL_LEVEL_HEAP_USAGE_RATIO_DELTA_AFTER_GC)));
}
} else {
_criticalLevelAfterGC = oldConfig._criticalLevelAfterGC;
}

if (changedConfigs.contains(CommonConstants.Accounting.CONFIG_OF_GC_BACKOFF_COUNT)) {
if (clusterConfigs == null || !clusterConfigs.containsKey(
CommonConstants.Accounting.CONFIG_OF_GC_BACKOFF_COUNT)) {
_gcBackoffCount = CommonConstants.Accounting.DEFAULT_GC_BACKOFF_COUNT;
} else {
_gcBackoffCount = Integer.parseInt(clusterConfigs.get(CommonConstants.Accounting.CONFIG_OF_GC_BACKOFF_COUNT));
}
} else {
_gcBackoffCount = oldConfig._gcBackoffCount;
}

if (changedConfigs.contains(CommonConstants.Accounting.CONFIG_OF_ALARMING_LEVEL_HEAP_USAGE_RATIO)) {
if (clusterConfigs == null || !clusterConfigs.containsKey(
CommonConstants.Accounting.CONFIG_OF_ALARMING_LEVEL_HEAP_USAGE_RATIO)) {
Expand All @@ -220,16 +170,6 @@ public QueryMonitorConfig(PinotConfiguration config, long maxHeapSize) {
_normalSleepTime = oldConfig._normalSleepTime;
}

if (changedConfigs.contains(CommonConstants.Accounting.CONFIG_OF_GC_WAIT_TIME_MS)) {
if (clusterConfigs == null || !clusterConfigs.containsKey(CommonConstants.Accounting.CONFIG_OF_GC_WAIT_TIME_MS)) {
_gcWaitTime = CommonConstants.Accounting.DEFAULT_CONFIG_OF_GC_WAIT_TIME_MS;
} else {
_gcWaitTime = Integer.parseInt(clusterConfigs.get(CommonConstants.Accounting.CONFIG_OF_GC_WAIT_TIME_MS));
}
} else {
_gcWaitTime = oldConfig._gcWaitTime;
}

if (changedConfigs.contains(CommonConstants.Accounting.CONFIG_OF_SLEEP_TIME_DENOMINATOR)) {
if (clusterConfigs == null || !clusterConfigs.containsKey(
CommonConstants.Accounting.CONFIG_OF_SLEEP_TIME_DENOMINATOR)) {
Expand Down Expand Up @@ -323,14 +263,6 @@ public long getCriticalLevel() {
return _criticalLevel;
}

public long getCriticalLevelAfterGC() {
return _criticalLevelAfterGC;
}

public int getGcBackoffCount() {
return _gcBackoffCount;
}

public long getAlarmingLevel() {
return _alarmingLevel;
}
Expand All @@ -339,10 +271,6 @@ public int getNormalSleepTime() {
return _normalSleepTime;
}

public int getGcWaitTime() {
return _gcWaitTime;
}

public int getAlarmingSleepTime() {
return _alarmingSleepTime;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,8 @@ public class QueryMonitorConfigTest {
private static final double EXPECTED_MIN_MEMORY_FOOTPRINT_FOR_KILL = 0.05;
private static final double EXPECTED_PANIC_LEVEL = 0.9f;
private static final double EXPECTED_CRITICAL_LEVEL = 0.95f;
private static final double EXPECTED_CRITICAL_LEVEL_AFTER_GC = 0.05f;
private static final int EXPECTED_GC_BACKOFF_COUNT = 3;
private static final double EXPECTED_ALARMING_LEVEL = 0.8f;
private static final int EXPECTED_NORMAL_SLEEP_TIME = 50;
private static final int EXPECTED_GC_WAIT_TIME = 1000;
private static final int EXPECTED_ALARMING_SLEEP_TIME_DENOMINATOR = 2;
private static final boolean EXPECTED_OOM_KILL_QUERY_ENABLED = true;
private static final boolean EXPECTED_PUBLISH_HEAP_USAGE_METRIC = true;
Expand Down Expand Up @@ -70,9 +67,6 @@ public void setUp() {
CLUSTER_CONFIGS.put(
getFullyQualifiedConfigName(CommonConstants.Accounting.CONFIG_OF_CRITICAL_LEVEL_HEAP_USAGE_RATIO),
Double.toString(EXPECTED_CRITICAL_LEVEL));
CLUSTER_CONFIGS.put(getFullyQualifiedConfigName(
CommonConstants.Accounting.CONFIG_OF_CRITICAL_LEVEL_HEAP_USAGE_RATIO_DELTA_AFTER_GC),
Double.toString(EXPECTED_CRITICAL_LEVEL_AFTER_GC));
CLUSTER_CONFIGS.put(
getFullyQualifiedConfigName(CommonConstants.Accounting.CONFIG_OF_ALARMING_LEVEL_HEAP_USAGE_RATIO),
Double.toString(EXPECTED_ALARMING_LEVEL));
Expand All @@ -83,10 +77,6 @@ public void setUp() {
CLUSTER_CONFIGS.put(
getFullyQualifiedConfigName(CommonConstants.Accounting.CONFIG_OF_MIN_MEMORY_FOOTPRINT_TO_KILL_RATIO),
Double.toString(EXPECTED_MIN_MEMORY_FOOTPRINT_FOR_KILL));
CLUSTER_CONFIGS.put(getFullyQualifiedConfigName(CommonConstants.Accounting.CONFIG_OF_GC_BACKOFF_COUNT),
Integer.toString(EXPECTED_GC_BACKOFF_COUNT));
CLUSTER_CONFIGS.put(getFullyQualifiedConfigName(CommonConstants.Accounting.CONFIG_OF_GC_WAIT_TIME_MS),
Integer.toString(EXPECTED_GC_WAIT_TIME));
CLUSTER_CONFIGS.put(getFullyQualifiedConfigName(CommonConstants.Accounting.CONFIG_OF_QUERY_KILLED_METRIC_ENABLED),
Boolean.toString(EXPECTED_IS_QUERY_KILLED_METRIC_ENABLED));
}
Expand Down Expand Up @@ -177,23 +167,6 @@ void testCriticalLevelHeapUsageRatioConfigChange() {
EXPECTED_CRITICAL_LEVEL * accountant.getWatcherTask().getQueryMonitorConfig().getMaxHeapSize());
}

@Test
void testCriticalLevelHeapUsageRatioDeltaAfterGCConfigChange() {
PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant accountant =
new PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant(new PinotConfiguration(), "test",
InstanceType.SERVER);

assertEquals(accountant.getWatcherTask().getQueryMonitorConfig().getCriticalLevelAfterGC(),
accountant.getWatcherTask().getQueryMonitorConfig().getCriticalLevel()
- CommonConstants.Accounting.DEFAULT_CONFIG_OF_CRITICAL_LEVEL_HEAP_USAGE_RATIO_DELTA_AFTER_GC
* accountant.getWatcherTask().getQueryMonitorConfig().getMaxHeapSize());
accountant.getWatcherTask().onChange(Set.of(getFullyQualifiedConfigName(
CommonConstants.Accounting.CONFIG_OF_CRITICAL_LEVEL_HEAP_USAGE_RATIO_DELTA_AFTER_GC)), CLUSTER_CONFIGS);
assertEquals(accountant.getWatcherTask().getQueryMonitorConfig().getCriticalLevelAfterGC(),
accountant.getWatcherTask().getQueryMonitorConfig().getCriticalLevel()
- EXPECTED_CRITICAL_LEVEL_AFTER_GC * accountant.getWatcherTask().getQueryMonitorConfig().getMaxHeapSize());
}

@Test
void testAlarmingLevelHeapUsageRatioConfigChange() {
PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant accountant =
Expand Down Expand Up @@ -258,34 +231,6 @@ void testMinMemoryFootprintToKillRatioConfigChange() {
.getMaxHeapSize()));
}

@Test
void testGCBackoffCountConfigChange() {
PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant accountant =
new PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant(new PinotConfiguration(), "test",
InstanceType.SERVER);

assertEquals(accountant.getWatcherTask().getQueryMonitorConfig().getGcBackoffCount(),
CommonConstants.Accounting.DEFAULT_GC_BACKOFF_COUNT);
accountant.getWatcherTask()
.onChange(Set.of(getFullyQualifiedConfigName(CommonConstants.Accounting.CONFIG_OF_GC_BACKOFF_COUNT)),
CLUSTER_CONFIGS);
assertEquals(accountant.getWatcherTask().getQueryMonitorConfig().getGcBackoffCount(), EXPECTED_GC_BACKOFF_COUNT);
}

@Test
void testGCWaitTimeConfigChange() {
PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant accountant =
new PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant(new PinotConfiguration(), "test",
InstanceType.SERVER);

assertEquals(accountant.getWatcherTask().getQueryMonitorConfig().getGcWaitTime(),
CommonConstants.Accounting.DEFAULT_CONFIG_OF_GC_WAIT_TIME_MS);
accountant.getWatcherTask()
.onChange(Set.of(getFullyQualifiedConfigName(CommonConstants.Accounting.CONFIG_OF_GC_WAIT_TIME_MS)),
CLUSTER_CONFIGS);
assertEquals(accountant.getWatcherTask().getQueryMonitorConfig().getGcWaitTime(), EXPECTED_GC_WAIT_TIME);
}

@Test
void testQueryKilledMetricEnabledConfigChange() {
PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant accountant =
Expand Down
Loading