Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,10 @@

import com.google.common.collect.ImmutableMap;
import java.lang.management.ManagementFactory;
import java.lang.management.MemoryMXBean;
import java.lang.management.MemoryUsage;
import java.lang.management.OperatingSystemMXBean;
import java.util.HashMap;
import java.util.Map;
import org.apache.pinot.spi.utils.ResourceUsageUtils;


/**
Expand Down Expand Up @@ -58,9 +57,7 @@ public SystemResourceInfo() {
_totalMemoryMB = runtime.totalMemory() / MEGA_BYTES;
}

MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean();
MemoryUsage heapMemoryUsage = memoryMXBean.getHeapMemoryUsage();
_maxHeapSizeMB = heapMemoryUsage.getMax() / MEGA_BYTES;
_maxHeapSizeMB = ResourceUsageUtils.getMaxHeapSize() / MEGA_BYTES;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,20 +25,20 @@
import com.fasterxml.jackson.core.JsonProcessingException;
import java.lang.management.GarbageCollectorMXBean;
import java.lang.management.ManagementFactory;
import java.lang.management.MemoryMXBean;
import java.lang.management.MemoryManagerMXBean;
import java.lang.management.MemoryUsage;
import java.lang.management.OperatingSystemMXBean;
import java.lang.management.RuntimeMXBean;
import java.lang.management.ThreadMXBean;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.spi.utils.Obfuscator;
import org.apache.pinot.spi.utils.ResourceUsageUtils;


/**
Expand Down Expand Up @@ -126,12 +126,9 @@ private JVMConfig buildJVMConfig() {
}

private RuntimeConfig buildRuntimeConfig() {
MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean();
MemoryUsage heapMemoryUsage = memoryMXBean.getHeapMemoryUsage();

ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();
return new RuntimeConfig(threadMXBean.getTotalStartedThreadCount(), threadMXBean.getThreadCount(),
FileUtils.byteCountToDisplaySize(heapMemoryUsage.getMax()),
MemoryUsage heapMemoryUsage = ResourceUsageUtils.getHeapMemoryUsage();
return new RuntimeConfig(ThreadResourceUsageProvider.getTotalStartedThreadCount(),
ThreadResourceUsageProvider.getThreadCount(), FileUtils.byteCountToDisplaySize(heapMemoryUsage.getMax()),
FileUtils.byteCountToDisplaySize(heapMemoryUsage.getUsed()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
*/
package org.apache.pinot.core.accounting;

import java.lang.management.ManagementFactory;
import java.lang.management.MemoryMXBean;
import java.util.Timer;
import java.util.TimerTask;
import org.apache.pinot.common.metrics.ServerGauge;
Expand All @@ -30,6 +28,7 @@
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.trace.Tracing;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.ResourceUsageUtils;


/**
Expand All @@ -46,7 +45,6 @@ public ThreadResourceUsageAccountant init(PinotConfiguration config, String inst
}

public static class HeapUsagePublishingResourceUsageAccountant extends Tracing.DefaultThreadResourceUsageAccountant {
static final MemoryMXBean MEMORY_MX_BEAN = ManagementFactory.getMemoryMXBean();
private final Timer _timer;
private final int _period;

Expand All @@ -56,8 +54,7 @@ public HeapUsagePublishingResourceUsageAccountant(int period) {
}

public void publishHeapUsageMetrics() {
ServerMetrics.get()
.setValueOfGlobalGauge(ServerGauge.JVM_HEAP_USED_BYTES, MEMORY_MX_BEAN.getHeapMemoryUsage().getUsed());
ServerMetrics.get().setValueOfGlobalGauge(ServerGauge.JVM_HEAP_USED_BYTES, ResourceUsageUtils.getUsedHeapSize());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import java.lang.management.ManagementFactory;
import java.lang.management.MemoryMXBean;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
Expand Down Expand Up @@ -57,6 +55,7 @@
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.metrics.PinotMetricUtils;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.ResourceUsageUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -78,11 +77,6 @@ public ThreadResourceUsageAccountant init(PinotConfiguration config, String inst
}

public static class PerQueryCPUMemResourceUsageAccountant implements ThreadResourceUsageAccountant {

/**
* MemoryMXBean to get total heap used memory
*/
static final MemoryMXBean MEMORY_MX_BEAN = ManagementFactory.getMemoryMXBean();
private static final Logger LOGGER = LoggerFactory.getLogger(PerQueryCPUMemResourceUsageAccountant.class);
private static final boolean IS_DEBUG_MODE_ENABLED = LOGGER.isDebugEnabled();
/**
Expand Down Expand Up @@ -326,22 +320,6 @@ public boolean isQueryTerminated() {
return false;
}

@Override
@Deprecated
public void createExecutionContext(String queryId, int taskId, ThreadExecutionContext.TaskType taskType,
@Nullable ThreadExecutionContext parentContext) {
}

@Override
@Deprecated
public void setThreadResourceUsageProvider(ThreadResourceUsageProvider threadResourceUsageProvider) {
}

@Override
@Deprecated
public void updateQueryUsageConcurrently(String queryId, long cpuTimeNs, long memoryAllocatedBytes) {
}

@Override
public void updateQueryUsageConcurrently(String queryId, long cpuTimeNs, long memoryAllocatedBytes,
TrackingScope trackingScope) {
Expand All @@ -359,11 +337,6 @@ public void updateQueryUsageConcurrently(String queryId, long cpuTimeNs, long me
}
}

@Override
@Deprecated
public void updateQueryUsageConcurrently(String queryId) {
}

/**
* The thread would need to do {@code setThreadResourceUsageProvider} first upon it is scheduled.
* This is to be called from a worker or a runner thread to update its corresponding cpu usage entry
Expand All @@ -386,13 +359,6 @@ public void sampleThreadBytesAllocated() {
}
}

@Deprecated
@Override
public void setupRunner(String queryId, int taskId, ThreadExecutionContext.TaskType taskType) {
setupRunner(queryId, taskId, taskType, CommonConstants.Accounting.DEFAULT_WORKLOAD_NAME);
}


@Override
public void setupRunner(@Nullable String queryId, int taskId, ThreadExecutionContext.TaskType taskType,
String workloadName) {
Expand Down Expand Up @@ -559,8 +525,8 @@ public void reapFinishedTasks() {

Thread thread = entry.getKey();
if (!thread.isAlive()) {
LOGGER.debug("Thread: {} is no longer alive, removing it from _threadEntriesMap", thread.getName());
_threadEntriesMap.remove(thread);
LOGGER.debug("Removing thread from _threadLocalEntry: {}", thread.getName());
}
}
_cancelSentQueries = cancellingQueries;
Expand Down Expand Up @@ -701,7 +667,7 @@ public class WatcherTask implements Runnable, PinotClusterConfigChangeListener {
private final AbstractMetrics.Gauge _memoryUsageGauge;

WatcherTask() {
_queryMonitorConfig.set(new QueryMonitorConfig(_config, MEMORY_MX_BEAN.getHeapMemoryUsage().getMax()));
_queryMonitorConfig.set(new QueryMonitorConfig(_config, ResourceUsageUtils.getMaxHeapSize()));
logQueryMonitorConfig();

switch (_instanceType) {
Expand Down Expand Up @@ -835,7 +801,7 @@ public void runOnce() {
}

private void collectTriggerMetrics() {
_usedBytes = MEMORY_MX_BEAN.getHeapMemoryUsage().getUsed();
_usedBytes = ResourceUsageUtils.getUsedHeapSize();
LOGGER.debug("Heap used bytes {}", _usedBytes);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@
package org.apache.pinot.core.accounting;

import com.fasterxml.jackson.annotation.JsonIgnore;
import java.lang.management.ManagementFactory;
import java.lang.management.MemoryMXBean;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
Expand All @@ -42,6 +40,7 @@
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.metrics.PinotMetricUtils;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.ResourceUsageUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -56,8 +55,6 @@
public class QueryAggregator implements ResourceAggregator {
private static final Logger LOGGER = LoggerFactory.getLogger(QueryAggregator.class);

static final MemoryMXBean MEMORY_MX_BEAN = ManagementFactory.getMemoryMXBean();

enum TriggeringLevel {
Normal, HeapMemoryAlarmingVerbose, CPUTimeBasedKilling, HeapMemoryCritical, HeapMemoryPanic
}
Expand All @@ -81,7 +78,7 @@ enum TriggeringLevel {
private final String _instanceId;

// max heap usage, Xmx
private final long _maxHeapSize = MEMORY_MX_BEAN.getHeapMemoryUsage().getMax();
private final long _maxHeapSize = ResourceUsageUtils.getMaxHeapSize();

// don't kill a query if its memory footprint is below some ratio of _maxHeapSize
private final long _minMemoryFootprintForKill;
Expand Down Expand Up @@ -420,7 +417,7 @@ private void killMostExpensiveQuery() {
Thread.sleep(_gcWaitTime);
} catch (InterruptedException ignored) {
}
_usedBytes = MEMORY_MX_BEAN.getHeapMemoryUsage().getUsed();
_usedBytes = ResourceUsageUtils.getUsedHeapSize();
if (_usedBytes < _criticalLevelAfterGC) {
return;
}
Expand Down Expand Up @@ -637,7 +634,7 @@ public void cleanUpPostAggregation() {
}

private void collectTriggerMetrics() {
_usedBytes = MEMORY_MX_BEAN.getHeapMemoryUsage().getUsed();
_usedBytes = ResourceUsageUtils.getUsedHeapSize();
LOGGER.debug("Heap used bytes {}", _usedBytes);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,39 +21,35 @@
import java.util.List;


/**
* Interface for aggregating CPU and memory usage of threads.
*/
/// Interface for aggregating CPU and memory usage of threads.
public interface ResourceAggregator {

/**
* Update CPU usage for one-off cases where identifier is known before-hand. For example: broker inbound netty
* thread where queryId and workloadName are already known.
*
* @param name identifier name - workload name, queryId, etc.
* @param cpuTimeNs CPU time in nanoseconds
*/
public void updateConcurrentCpuUsage(String name, long cpuTimeNs);
/// Updates CPU usage for one-off cases where identifier is known beforehand. For example: broker inbound netty thread
/// where queryId and workloadName are already known.
///
/// @param name identifier name - queryId, workload name, etc.
/// @param cpuTimeNs CPU time in nanoseconds
void updateConcurrentCpuUsage(String name, long cpuTimeNs);

/**
* Update CPU usage for one-off cases where identifier is known before-hand. For example: broker inbound netty
* @param name identifier name - workload name, queryId, etc.
* @param memBytes memory usage in bytes
*/
public void updateConcurrentMemUsage(String name, long memBytes);
/// Updates memory usage for one-off cases where identifier is known beforehand. For example: broker inbound netty
/// thread where queryId and workloadName are already known.
///
/// @param name identifier name - queryId, workload name, etc.
/// @param memBytes memory usage in bytes
void updateConcurrentMemUsage(String name, long memBytes);

// Cleanup of state after periodic aggregation is complete.
public void cleanUpPostAggregation();
/// Cleans up state after periodic aggregation is complete.
void cleanUpPostAggregation();

// Sleep time between aggregations.
public int getAggregationSleepTimeMs();
/// Sleep time between aggregations.
int getAggregationSleepTimeMs();

// Pre-aggregation step to be called before the aggregation of all thread entries.
public void preAggregate(List<CPUMemThreadLevelAccountingObjects.ThreadEntry> anchorThreadEntries);
/// Pre-aggregation step to be called before the aggregation of all thread entries.
void preAggregate(List<CPUMemThreadLevelAccountingObjects.ThreadEntry> anchorThreadEntries);

// Aggregation of each thread entry
public void aggregate(Thread thread, CPUMemThreadLevelAccountingObjects.ThreadEntry threadEntry);
/// Aggregates on a thread entry.
void aggregate(Thread thread, CPUMemThreadLevelAccountingObjects.ThreadEntry threadEntry);

// Post-aggregation step to be called after the aggregation of all thread entries.
public void postAggregate();
/// Post-aggregation step to be called after the aggregation of all thread entries.
void postAggregate();
}
Loading
Loading