Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.common.utils.HashUtil;
import org.apache.pinot.common.utils.RoaringBitmapUtils;
import org.apache.pinot.spi.accounting.ThreadResourceSnapshot;
import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider;
import org.apache.pinot.spi.trace.Tracing;
import org.apache.pinot.spi.utils.BigDecimalUtils;
Expand Down Expand Up @@ -415,7 +416,7 @@ public Map<Integer, String> getExceptions() {
@Override
public byte[] toBytes()
throws IOException {
ThreadResourceUsageProvider threadTimer = new ThreadResourceUsageProvider();
ThreadResourceSnapshot resourceSnapshot = new ThreadResourceSnapshot();

ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream);
Expand All @@ -425,13 +426,12 @@ public byte[] toBytes()
// TODO: The check on cpu time and memory measurement is not needed. We can remove it. But keeping it around for
// backward compatibility.
if (ThreadResourceUsageProvider.isThreadCpuTimeMeasurementEnabled()) {
long responseSerializationCpuTimeNs = threadTimer.getThreadTimeNs();
getMetadata().put(MetadataKey.RESPONSE_SER_CPU_TIME_NS.getName(), String.valueOf(responseSerializationCpuTimeNs));
getMetadata().put(MetadataKey.RESPONSE_SER_CPU_TIME_NS.getName(),
String.valueOf(resourceSnapshot.getCpuTimeNs()));
}
if (ThreadResourceUsageProvider.isThreadMemoryMeasurementEnabled()) {
long responseSerializationAllocatedBytes = threadTimer.getThreadAllocatedBytes();
getMetadata().put(MetadataKey.RESPONSE_SER_MEM_ALLOCATED_BYTES.getName(),
String.valueOf(responseSerializationAllocatedBytes));
String.valueOf(resourceSnapshot.getAllocatedBytes()));
}

// Write metadata: length followed by actual metadata bytes.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.pinot.spi.accounting.ThreadExecutionContext;
import org.apache.pinot.spi.accounting.ThreadResourceSnapshot;
import org.apache.pinot.spi.accounting.ThreadResourceTracker;
import org.apache.pinot.spi.utils.CommonConstants;

Expand All @@ -43,6 +44,9 @@ public static class ThreadEntry implements ThreadResourceTracker {
volatile long _currentThreadCPUTimeSampleMS = 0;
volatile long _currentThreadMemoryAllocationSampleBytes = 0;

// reference point for start time/bytes
private final ThreadResourceSnapshot _threadResourceSnapshot = new ThreadResourceSnapshot();

// previous query_id, task_id of the thread, this field should only be accessed by the accountant
TaskEntry _previousThreadTaskStatus = null;
// previous cpu time and memory allocation of the thread
Expand Down Expand Up @@ -113,6 +117,20 @@ public ThreadExecutionContext.TaskType getTaskType() {
public void setThreadTaskStatus(String queryId, int taskId, ThreadExecutionContext.TaskType taskType,
@Nonnull Thread anchorThread) {
_currentThreadTaskStatus.set(new TaskEntry(queryId, taskId, taskType, anchorThread));
_threadResourceSnapshot.reset();
}

/**
* Note that the precision does not match the name of the variable.
* _currentThreadCPUTimeSampleMS is in nanoseconds, but the variable name suggests milliseconds.
* This is to maintain backward compatibility. It replaces code that set the value in nanoseconds.
*/
public void updateCpuSnapshot() {
_currentThreadCPUTimeSampleMS = _threadResourceSnapshot.getCpuTimeNs();
}

public void updateMemorySnapshot() {
_currentThreadMemoryAllocationSampleBytes = _threadResourceSnapshot.getAllocatedBytes();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,6 @@ public static class PerQueryCPUMemResourceUsageAccountant implements ThreadResou
}
);

// ThreadResourceUsageProvider(ThreadMXBean wrapper) per runner/worker thread
private final ThreadLocal<ThreadResourceUsageProvider> _threadResourceUsageProvider;

// track thread cpu time
private final boolean _isThreadCPUSamplingEnabled;

Expand Down Expand Up @@ -168,9 +165,6 @@ public PerQueryCPUMemResourceUsageAccountant(PinotConfiguration config, String i
CommonConstants.Accounting.DEFAULT_ENABLE_THREAD_SAMPLING_MSE);
LOGGER.info("_isThreadSamplingEnabledForMSE: {}", _isThreadSamplingEnabledForMSE);

// ThreadMXBean wrapper
_threadResourceUsageProvider = new ThreadLocal<>();

// task/query tracking
_inactiveQuery = new HashSet<>();

Expand Down Expand Up @@ -277,29 +271,35 @@ public int getEntryCount() {
}

@Override
public void updateQueryUsageConcurrently(String queryId) {
@Deprecated
public void setThreadResourceUsageProvider(ThreadResourceUsageProvider threadResourceUsageProvider) {
}

@Override
public void updateQueryUsageConcurrently(String queryId, long cpuTimeNs, long memoryAllocatedBytes) {
if (_isThreadCPUSamplingEnabled) {
long cpuUsageNS = getThreadResourceUsageProvider().getThreadTimeNs();
_concurrentTaskCPUStatsAggregator.compute(queryId,
(key, value) -> (value == null) ? cpuUsageNS : (value + cpuUsageNS));
(key, value) -> (value == null) ? cpuTimeNs : (value + cpuTimeNs));
}
if (_isThreadMemorySamplingEnabled) {
long memoryAllocatedBytes = getThreadResourceUsageProvider().getThreadAllocatedBytes();
_concurrentTaskMemStatsAggregator.compute(queryId,
(key, value) -> (value == null) ? memoryAllocatedBytes : (value + memoryAllocatedBytes));
}
}

@Override
@Deprecated
public void updateQueryUsageConcurrently(String queryId) {
}

/**
* The thread would need to do {@code setThreadResourceUsageProvider} first upon it is scheduled.
* This is to be called from a worker or a runner thread to update its corresponding cpu usage entry
*/
@SuppressWarnings("ConstantConditions")
public void sampleThreadCPUTime() {
ThreadResourceUsageProvider provider = getThreadResourceUsageProvider();
if (_isThreadCPUSamplingEnabled && provider != null) {
_threadLocalEntry.get()._currentThreadCPUTimeSampleMS = provider.getThreadTimeNs();
if (_isThreadCPUSamplingEnabled) {
_threadLocalEntry.get().updateCpuSnapshot();
}
}

Expand All @@ -309,21 +309,11 @@ public void sampleThreadCPUTime() {
*/
@SuppressWarnings("ConstantConditions")
public void sampleThreadBytesAllocated() {
ThreadResourceUsageProvider provider = getThreadResourceUsageProvider();
if (_isThreadMemorySamplingEnabled && provider != null) {
_threadLocalEntry.get()._currentThreadMemoryAllocationSampleBytes = provider.getThreadAllocatedBytes();
if (_isThreadMemorySamplingEnabled) {
_threadLocalEntry.get().updateMemorySnapshot();
}
}

private ThreadResourceUsageProvider getThreadResourceUsageProvider() {
return _threadResourceUsageProvider.get();
}

@Override
public void setThreadResourceUsageProvider(ThreadResourceUsageProvider threadResourceUsageProvider) {
_threadResourceUsageProvider.set(threadResourceUsageProvider);
}

@Override
public void setupRunner(@Nullable String queryId, int taskId, ThreadExecutionContext.TaskType taskType) {
_threadLocalEntry.get()._errorStatus.set(null);
Expand Down Expand Up @@ -362,8 +352,6 @@ public void clear() {
CPUMemThreadLevelAccountingObjects.ThreadEntry threadEntry = _threadLocalEntry.get();
// clear task info + stats
threadEntry.setToIdle();
// clear threadResourceUsageProvider
_threadResourceUsageProvider.remove();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import org.apache.pinot.core.query.request.context.QueryContext;
import org.apache.pinot.segment.spi.FetchContext;
import org.apache.pinot.segment.spi.SegmentContext;
import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider;
import org.apache.pinot.spi.accounting.ThreadResourceSnapshot;
import org.apache.pinot.spi.exception.EarlyTerminationException;
import org.apache.pinot.spi.exception.QueryErrorCode;
import org.apache.pinot.spi.exception.QueryErrorMessage;
Expand Down Expand Up @@ -102,13 +102,12 @@ protected InstanceResponseBlock buildInstanceResponseBlock(BaseResultsBlock base

protected BaseResultsBlock getBaseBlock() {
long startWallClockTimeNs = System.nanoTime();
ThreadResourceUsageProvider mainThreadResourceUsageProvider = new ThreadResourceUsageProvider();
ThreadResourceSnapshot resourceSnapshot = new ThreadResourceSnapshot();

BaseResultsBlock resultsBlock = getCombinedResults();

// No-ops if CPU time measurement and/or memory allocation measurements are not enabled.
long mainThreadCpuTimeNs = mainThreadResourceUsageProvider.getThreadTimeNs();
long mainThreadMemAllocatedBytes = mainThreadResourceUsageProvider.getThreadAllocatedBytes();
long mainThreadCpuTimeNs = resourceSnapshot.getCpuTimeNs();
long mainThreadMemAllocatedBytes = resourceSnapshot.getAllocatedBytes();

long totalWallClockTimeNs = System.nanoTime() - startWallClockTimeNs;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
import org.apache.pinot.core.util.QueryMultiThreadingUtils;
import org.apache.pinot.core.util.trace.TraceRunnable;
import org.apache.pinot.spi.accounting.ThreadExecutionContext;
import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider;
import org.apache.pinot.spi.accounting.ThreadResourceSnapshot;
import org.apache.pinot.spi.exception.EarlyTerminationException;
import org.apache.pinot.spi.exception.QueryErrorCode;
import org.apache.pinot.spi.exception.QueryErrorMessage;
Expand Down Expand Up @@ -103,8 +103,7 @@ protected void startProcess() {
_futures[i] = _executorService.submit(new TraceRunnable() {
@Override
public void runJob() {
ThreadResourceUsageProvider threadResourceUsageProvider = new ThreadResourceUsageProvider();

ThreadResourceSnapshot resourceSnapshot = new ThreadResourceSnapshot();
Tracing.ThreadAccountantOps.setupWorker(taskId, parentContext);

// Register the task to the phaser
Expand Down Expand Up @@ -136,8 +135,8 @@ public void runJob() {
Tracing.ThreadAccountantOps.clear();
}

_totalWorkerThreadCpuTimeNs.getAndAdd(threadResourceUsageProvider.getThreadTimeNs());
_totalWorkerThreadMemAllocatedBytes.getAndAdd(threadResourceUsageProvider.getThreadAllocatedBytes());
_totalWorkerThreadCpuTimeNs.getAndAdd(resourceSnapshot.getCpuTimeNs());
_totalWorkerThreadMemAllocatedBytes.getAndAdd(resourceSnapshot.getAllocatedBytes());
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.pinot.common.datatable.DataTableFactory;
import org.apache.pinot.common.metrics.BrokerMeter;
import org.apache.pinot.common.metrics.BrokerMetrics;
import org.apache.pinot.spi.accounting.ThreadResourceSnapshot;
import org.apache.pinot.spi.trace.Tracing;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -62,16 +63,17 @@ public void channelInactive(ChannelHandlerContext ctx) {

@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) {
Tracing.ThreadAccountantOps.setThreadResourceUsageProvider();
int responseSize = msg.readableBytes();
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.NETTY_CONNECTION_BYTES_RECEIVED, responseSize);
try {
long deserializationStartTimeMs = System.currentTimeMillis();
ThreadResourceSnapshot resourceSnapshot = new ThreadResourceSnapshot();
DataTable dataTable = DataTableFactory.getDataTable(msg.nioBuffer());
_queryRouter.receiveDataTable(_serverRoutingInstance, dataTable, responseSize,
(int) (System.currentTimeMillis() - deserializationStartTimeMs));
long requestID = Long.parseLong(dataTable.getMetadata().get(DataTable.MetadataKey.REQUEST_ID.getName()));
Tracing.ThreadAccountantOps.updateQueryUsageConcurrently(String.valueOf(requestID));
Tracing.ThreadAccountantOps.updateQueryUsageConcurrently(String.valueOf(requestID),
resourceSnapshot.getCpuTimeNs(), resourceSnapshot.getAllocatedBytes());
} catch (Exception e) {
LOGGER.error("Caught exception while deserializing data table of size: {} from server: {}", responseSize,
_serverRoutingInstance, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.concurrent.atomic.AtomicLong;
import org.apache.log4j.Level;
import org.apache.log4j.LogManager;
import org.apache.pinot.spi.accounting.ThreadResourceSnapshot;
import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -49,11 +50,11 @@ public void setup() {
@Test
public void testThreadMXBeanSimpleMemAllocTracking() {
if (ThreadResourceUsageProvider.isThreadMemoryMeasurementEnabled()) {
ThreadResourceUsageProvider threadResourceUsageProvider = new ThreadResourceUsageProvider();
ThreadResourceSnapshot threadResourceSnapshot = new ThreadResourceSnapshot();
long[] ll = new long[10000];
ll[2] = 4;
LOGGER.trace(String.valueOf(ll[2]));
long result = threadResourceUsageProvider.getThreadAllocatedBytes();
long result = threadResourceSnapshot.getAllocatedBytes();
Assert.assertTrue(result >= 80000 && result <= 85000);
}
}
Expand All @@ -75,37 +76,37 @@ public void testThreadMXBeanMultithreadMemAllocTracking() {
System.gc();

long heapPrev = memoryMXBean.getHeapMemoryUsage().getUsed();
ThreadResourceUsageProvider threadResourceUsageProvider0 = new ThreadResourceUsageProvider();
ThreadResourceSnapshot threadResourceSnapshot0 = new ThreadResourceSnapshot();
executor.submit(() -> {
ThreadResourceUsageProvider threadResourceUsageProvider = new ThreadResourceUsageProvider();
ThreadResourceSnapshot threadResourceSnapshot = new ThreadResourceSnapshot();
for (int i = 0; i < 100000; i++) {
concurrentHashMap.put(i, i);
}
a.set(threadResourceUsageProvider.getThreadAllocatedBytes());
a.set(threadResourceSnapshot.getAllocatedBytes());
});

executor.submit(() -> {
ThreadResourceUsageProvider threadResourceUsageProvider = new ThreadResourceUsageProvider();
ThreadResourceSnapshot threadResourceSnapshot = new ThreadResourceSnapshot();
for (int i = 100000; i < 200000; i++) {
concurrentHashMap.put(i, i);
}
b.set(threadResourceUsageProvider.getThreadAllocatedBytes());
b.set(threadResourceSnapshot.getAllocatedBytes());
});

executor.submit(() -> {
ThreadResourceUsageProvider threadResourceUsageProvider = new ThreadResourceUsageProvider();
ThreadResourceSnapshot threadResourceSnapshot = new ThreadResourceSnapshot();
for (int i = 0; i < 200000; i++) {
concurrentHashMap2.put(i, i);
}
c.set(threadResourceUsageProvider.getThreadAllocatedBytes());
c.set(threadResourceSnapshot.getAllocatedBytes());
});

try {
Thread.sleep(1000);
} catch (InterruptedException ignored) {
}

long d = threadResourceUsageProvider0.getThreadAllocatedBytes();
long d = threadResourceSnapshot0.getAllocatedBytes();
long threadAllocatedBytes = a.get() + b.get() + c.get() + d;
float heapUsedBytes = (float) memoryMXBean.getHeapMemoryUsage().getUsed() - heapPrev;
float ratio = threadAllocatedBytes / heapUsedBytes;
Expand All @@ -132,37 +133,37 @@ public void testThreadMXBeanDeepMemAllocTracking() {
System.gc();

long heapPrev = memoryMXBean.getHeapMemoryUsage().getUsed();
ThreadResourceUsageProvider threadResourceUsageProvider0 = new ThreadResourceUsageProvider();
ThreadResourceSnapshot threadResourceSnapshot0 = new ThreadResourceSnapshot();
executor.submit(() -> {
ThreadResourceUsageProvider threadResourceUsageProvider = new ThreadResourceUsageProvider();
ThreadResourceSnapshot threadResourceSnapshot = new ThreadResourceSnapshot();
for (int i = 0; i < 100; i++) {
concurrentHashMap.put(i, new NestedArray());
}
a.set(threadResourceUsageProvider.getThreadAllocatedBytes());
a.set(threadResourceSnapshot.getAllocatedBytes());
});

executor.submit(() -> {
ThreadResourceUsageProvider threadResourceUsageProvider = new ThreadResourceUsageProvider();
ThreadResourceSnapshot threadResourceSnapshot = new ThreadResourceSnapshot();
for (int i = 100; i < 200; i++) {
concurrentHashMap.put(i, new NestedArray());
}
b.set(threadResourceUsageProvider.getThreadAllocatedBytes());
b.set(threadResourceSnapshot.getAllocatedBytes());
});

executor.submit(() -> {
ThreadResourceUsageProvider threadResourceUsageProvider = new ThreadResourceUsageProvider();
ThreadResourceSnapshot threadResourceSnapshot = new ThreadResourceSnapshot();
for (int i = 0; i < 200; i++) {
concurrentHashMap2.put(i, new NestedArray());
}
c.set(threadResourceUsageProvider.getThreadAllocatedBytes());
c.set(threadResourceSnapshot.getAllocatedBytes());
});

try {
Thread.sleep(1000);
} catch (InterruptedException ignored) {
}

long d = threadResourceUsageProvider0.getThreadAllocatedBytes();
long d = threadResourceSnapshot0.getAllocatedBytes();
long threadAllocatedBytes = a.get() + b.get() + c.get() + d;
float heapUsedBytes = (float) memoryMXBean.getHeapMemoryUsage().getUsed() - heapPrev;
float ratio = threadAllocatedBytes / heapUsedBytes;
Expand All @@ -181,14 +182,14 @@ public void testThreadMXBeanMemAllocGCTracking() {
LogManager.getLogger(TestThreadMXBean.class).setLevel(Level.INFO);
MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean();
System.gc();
ThreadResourceUsageProvider threadResourceUsageProvider0 = new ThreadResourceUsageProvider();
ThreadResourceSnapshot threadResourceSnapshot0 = new ThreadResourceSnapshot();
long heapPrev = memoryMXBean.getHeapMemoryUsage().getUsed();
for (int i = 0; i < 3; i++) {
long[] ignored = new long[100000000];
}
System.gc();
long heapResult = memoryMXBean.getHeapMemoryUsage().getUsed() - heapPrev;
long result = threadResourceUsageProvider0.getThreadAllocatedBytes();
long result = threadResourceSnapshot0.getAllocatedBytes();
LOGGER.info("Measured thread allocated bytes {}, heap used bytes {}",
result, heapResult);
}
Expand Down
Loading
Loading