Skip to content

Commit

Permalink
Add runtimeStats to track read calls to underlying storage
Browse files Browse the repository at this point in the history
  • Loading branch information
NikhilCollooru authored and highker committed Mar 8, 2022
1 parent 9faf7d5 commit f63dfbc
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,8 @@ private RuntimeMetricName()
public static final String TASK_SCHEDULED_TIME_NANOS = "taskScheduledTimeNanos";
// Blocked time for the operators due to waiting for inputs.
public static final String TASK_BLOCKED_TIME_NANOS = "taskBlockedTimeNanos";
// Time taken for a read call to storage
public static final String STORAGE_READ_TIME_NANOS = "storageReadTimeNanos";
// Size of the data retrieved by read call to storage
public static final String STORAGE_READ_DATA_BYTES = "storageReadDataBytes";
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import com.facebook.presto.common.Page;
import com.facebook.presto.common.PageBuilder;
import com.facebook.presto.common.RuntimeStats;
import com.facebook.presto.common.block.Block;
import com.facebook.presto.common.block.LazyBlock;
import com.facebook.presto.common.block.LazyBlockLoader;
Expand Down Expand Up @@ -53,6 +54,8 @@
import java.util.function.Supplier;

import static com.facebook.airlift.concurrent.MoreFutures.toListenableFuture;
import static com.facebook.presto.common.RuntimeMetricName.STORAGE_READ_DATA_BYTES;
import static com.facebook.presto.common.RuntimeMetricName.STORAGE_READ_TIME_NANOS;
import static com.facebook.presto.memory.context.AggregatedMemoryContext.newSimpleAggregatedMemoryContext;
import static com.google.common.base.Preconditions.checkState;
import static java.util.Objects.requireNonNull;
Expand Down Expand Up @@ -344,10 +347,16 @@ private void recordInputStats()
long endCompletedPositions = pageSource.getCompletedPositions();
long endReadTimeNanos = pageSource.getReadTimeNanos();
long inputBytes = endCompletedBytes - completedBytes;
long inputBytesReadTime = endReadTimeNanos - readTimeNanos;
long positionCount = endCompletedPositions - completedPositions;
operatorContext.recordProcessedInput(inputBytes, positionCount);
operatorContext.recordRawInputWithTiming(inputBytes, positionCount, endReadTimeNanos - readTimeNanos);
operatorContext.updateStats(pageSource.getRuntimeStats());
operatorContext.recordRawInputWithTiming(inputBytes, positionCount, inputBytesReadTime);
RuntimeStats runtimeStats = pageSource.getRuntimeStats();
if (runtimeStats != null) {
runtimeStats.addMetricValueIgnoreZero(STORAGE_READ_TIME_NANOS, inputBytesReadTime);
runtimeStats.addMetricValueIgnoreZero(STORAGE_READ_DATA_BYTES, inputBytes);
operatorContext.updateStats(runtimeStats);
}
completedBytes = endCompletedBytes;
completedPositions = endCompletedPositions;
readTimeNanos = endReadTimeNanos;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package com.facebook.presto.operator;

import com.facebook.presto.common.Page;
import com.facebook.presto.common.RuntimeStats;
import com.facebook.presto.memory.context.LocalMemoryContext;
import com.facebook.presto.metadata.Split;
import com.facebook.presto.spi.ColumnHandle;
Expand All @@ -38,6 +39,8 @@
import java.util.function.Supplier;

import static com.facebook.airlift.concurrent.MoreFutures.toListenableFuture;
import static com.facebook.presto.common.RuntimeMetricName.STORAGE_READ_DATA_BYTES;
import static com.facebook.presto.common.RuntimeMetricName.STORAGE_READ_TIME_NANOS;
import static com.google.common.base.Preconditions.checkState;
import static java.util.Objects.requireNonNull;

Expand Down Expand Up @@ -272,10 +275,16 @@ private void recordInputStats()
long endCompletedPositions = source.getCompletedPositions();
long endReadTimeNanos = source.getReadTimeNanos();
long inputBytes = endCompletedBytes - completedBytes;
long inputBytesReadTime = endReadTimeNanos - readTimeNanos;
long positionCount = endCompletedPositions - completedPositions;
operatorContext.recordProcessedInput(inputBytes, positionCount);
operatorContext.recordRawInputWithTiming(inputBytes, positionCount, endReadTimeNanos - readTimeNanos);
operatorContext.updateStats(source.getRuntimeStats());
operatorContext.recordRawInputWithTiming(inputBytes, positionCount, inputBytesReadTime);
RuntimeStats runtimeStats = source.getRuntimeStats();
if (runtimeStats != null) {
runtimeStats.addMetricValueIgnoreZero(STORAGE_READ_TIME_NANOS, inputBytesReadTime);
runtimeStats.addMetricValueIgnoreZero(STORAGE_READ_DATA_BYTES, inputBytes);
operatorContext.updateStats(runtimeStats);
}
completedBytes = endCompletedBytes;
completedPositions = endCompletedPositions;
readTimeNanos = endReadTimeNanos;
Expand Down

0 comments on commit f63dfbc

Please sign in to comment.