Skip to content

Commit

Permalink
Add peak task user memory to TaskStats
Browse files Browse the repository at this point in the history
  • Loading branch information
wenleix committed Sep 25, 2020
1 parent 46bcc63 commit ebf428a
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,9 @@ public class TaskContext

private final Object cumulativeMemoryLock = new Object();
private final AtomicDouble cumulativeUserMemory = new AtomicDouble(0.0);

private final AtomicLong peakTotalMemoryInBytes = new AtomicLong(0);
private final AtomicLong peakUserMemoryInBytes = new AtomicLong(0);

@GuardedBy("cumulativeMemoryLock")
private long lastUserMemoryReservation;
Expand Down Expand Up @@ -498,7 +500,7 @@ public TaskStats getTaskStats()
long userMemory = taskMemoryContext.getUserMemory();
long systemMemory = taskMemoryContext.getSystemMemory();

peakTotalMemoryInBytes.accumulateAndGet(userMemory + systemMemory, Math::max);
updatePeakMemory();

synchronized (cumulativeMemoryLock) {
double sinceLastPeriodMillis = (System.nanoTime() - lastTaskStatCallNanos) / 1_000_000.0;
Expand Down Expand Up @@ -538,6 +540,7 @@ public TaskStats getTaskStats()
taskMemoryContext.getRevocableMemory(),
systemMemory,
peakTotalMemoryInBytes.get(),
peakUserMemoryInBytes.get(),
totalScheduledTime,
totalCpuTime,
totalBlockedTime,
Expand All @@ -556,6 +559,15 @@ public TaskStats getTaskStats()
pipelineStats);
}

private void updatePeakMemory()
{
long userMemory = taskMemoryContext.getUserMemory();
long systemMemory = taskMemoryContext.getSystemMemory();

peakTotalMemoryInBytes.accumulateAndGet(userMemory + systemMemory, Math::max);
peakUserMemoryInBytes.accumulateAndGet(userMemory, Math::max);
}

public <C, R> R accept(QueryContextVisitor<C, R> visitor, C context)
{
return visitor.visitTaskContext(this, context);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ public class TaskStats
private final long userMemoryReservationInBytes;
private final long revocableMemoryReservationInBytes;
private final long systemMemoryReservationInBytes;

private final long peakUserMemoryInBytes;
private final long peakTotalMemoryInBytes;

private final long totalScheduledTimeInNanos;
Expand Down Expand Up @@ -79,7 +81,8 @@ public class TaskStats

public TaskStats(DateTime createTime, DateTime endTime)
{
this(createTime,
this(
createTime,
null,
null,
null,
Expand All @@ -97,7 +100,8 @@ public TaskStats(DateTime createTime, DateTime endTime)
0L,
0L,
0L,
0,
0L,
0L,
0L,
0L,
0L,
Expand Down Expand Up @@ -138,7 +142,9 @@ public TaskStats(
@JsonProperty("userMemoryReservation") long userMemoryReservationInBytes,
@JsonProperty("revocableMemoryReservationInBytes") long revocableMemoryReservationInBytes,
@JsonProperty("systemMemoryReservationInBytes") long systemMemoryReservationInBytes,

@JsonProperty("peakTotalMemoryInBytes") long peakTotalMemoryInBytes,
@JsonProperty("peakUserMemoryInBytes") long peakUserMemoryInBytes,

@JsonProperty("totalScheduledTimeInNanos") long totalScheduledTimeInNanos,
@JsonProperty("totalCpuTimeInNanos") long totalCpuTimeInNanos,
Expand Down Expand Up @@ -194,7 +200,9 @@ public TaskStats(
this.userMemoryReservationInBytes = userMemoryReservationInBytes;
this.revocableMemoryReservationInBytes = revocableMemoryReservationInBytes;
this.systemMemoryReservationInBytes = systemMemoryReservationInBytes;

this.peakTotalMemoryInBytes = peakTotalMemoryInBytes;
this.peakUserMemoryInBytes = peakUserMemoryInBytes;

this.totalScheduledTimeInNanos = totalScheduledTimeInNanos;
this.totalCpuTimeInNanos = totalCpuTimeInNanos;
Expand Down Expand Up @@ -325,6 +333,12 @@ public long getSystemMemoryReservationInBytes()
return systemMemoryReservationInBytes;
}

@JsonProperty
public long getPeakUserMemoryInBytes()
{
return peakUserMemoryInBytes;
}

@JsonProperty
public long getPeakTotalMemoryInBytes()
{
Expand Down Expand Up @@ -461,6 +475,7 @@ public TaskStats summarize()
revocableMemoryReservationInBytes,
systemMemoryReservationInBytes,
peakTotalMemoryInBytes,
peakUserMemoryInBytes,
totalScheduledTimeInNanos,
totalCpuTimeInNanos,
totalBlockedTimeInNanos,
Expand Down Expand Up @@ -501,6 +516,7 @@ public TaskStats summarizeFinal()
revocableMemoryReservationInBytes,
systemMemoryReservationInBytes,
peakTotalMemoryInBytes,
peakUserMemoryInBytes,
totalScheduledTimeInNanos,
totalCpuTimeInNanos,
totalBlockedTimeInNanos,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public class TestTaskStats
13,
14,
26,
27,
15,
16,
18,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,8 @@ public PrestoSparkTaskExecution(
"Fragment is partitioned, but not all partitioned drivers were found");

taskHandle = createTaskHandle(taskStateMachine, taskContext, localExecutionPlan, taskExecutor);

// TODO: periodically call taskContext::updatePeakMemory
}

// this is a separate method to ensure that the `this` reference is not leaked during construction
Expand Down

0 comments on commit ebf428a

Please sign in to comment.