Skip to content

Commit

Permalink
Update peak memory in TaskContext for Presto-on-Spark task
Browse files Browse the repository at this point in the history
The peak memory stats in TaskContext only get updated when
TaskContext::getTaskStats(). Presto-on-Spark doesn't pull task stats
actively so it needs to be updated explicitly.
  • Loading branch information
wenleix committed Sep 25, 2020
1 parent c9e550b commit 6b97e58
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -559,7 +559,7 @@ public TaskStats getTaskStats()
pipelineStats);
}

private void updatePeakMemory()
public void updatePeakMemory()
{
long userMemory = taskMemoryContext.getUserMemory();
long systemMemory = taskMemoryContext.getSystemMemory();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import java.util.OptionalInt;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

Expand All @@ -63,6 +64,7 @@
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Verify.verify;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.TimeUnit.SECONDS;

/**
* The PrestoSparkTaskExecution is a simplified version of SqlTaskExecution.
Expand Down Expand Up @@ -102,7 +104,8 @@ public PrestoSparkTaskExecution(
LocalExecutionPlan localExecutionPlan,
TaskExecutor taskExecutor,
SplitMonitor splitMonitor,
Executor notificationExecutor)
Executor notificationExecutor,
ScheduledExecutorService memoryUpdateExecutor)
{
this.taskStateMachine = requireNonNull(taskStateMachine, "taskStateMachine is null");
this.taskId = taskStateMachine.getTaskId();
Expand Down Expand Up @@ -139,7 +142,8 @@ public PrestoSparkTaskExecution(

taskHandle = createTaskHandle(taskStateMachine, taskContext, localExecutionPlan, taskExecutor);

// TODO: periodically call taskContext::updatePeakMemory
requireNonNull(memoryUpdateExecutor, "memoryUpdateExecutor is null");
memoryUpdateExecutor.schedule(taskContext::updatePeakMemory, 1, SECONDS);
}

// this is a separate method to ensure that the `this` reference is not leaked during construction
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ public class PrestoSparkTaskExecutorFactory

private final Executor notificationExecutor;
private final ScheduledExecutorService yieldExecutor;
private final ScheduledExecutorService memoryUpdateExecutor;

private final LocalExecutionPlanner localExecutionPlanner;
private final PrestoSparkExecutionExceptionFactory executionExceptionFactory;
Expand Down Expand Up @@ -164,6 +165,7 @@ public PrestoSparkTaskExecutorFactory(
JsonCodec<TaskInfo> taskInfoJsonCodec,
Executor notificationExecutor,
ScheduledExecutorService yieldExecutor,
ScheduledExecutorService memoryUpdateExecutor,
LocalExecutionPlanner localExecutionPlanner,
PrestoSparkExecutionExceptionFactory executionExceptionFactory,
TaskExecutor taskExecutor,
Expand All @@ -183,6 +185,7 @@ public PrestoSparkTaskExecutorFactory(
taskInfoJsonCodec,
notificationExecutor,
yieldExecutor,
memoryUpdateExecutor,
localExecutionPlanner,
executionExceptionFactory,
taskExecutor,
Expand All @@ -208,6 +211,7 @@ public PrestoSparkTaskExecutorFactory(
JsonCodec<TaskInfo> taskInfoJsonCodec,
Executor notificationExecutor,
ScheduledExecutorService yieldExecutor,
ScheduledExecutorService memoryUpdateExecutor,
LocalExecutionPlanner localExecutionPlanner,
PrestoSparkExecutionExceptionFactory executionExceptionFactory,
TaskExecutor taskExecutor,
Expand All @@ -231,6 +235,7 @@ public PrestoSparkTaskExecutorFactory(
this.taskInfoJsonCodec = requireNonNull(taskInfoJsonCodec, "taskInfoJsonCodec is null");
this.notificationExecutor = requireNonNull(notificationExecutor, "notificationExecutor is null");
this.yieldExecutor = requireNonNull(yieldExecutor, "yieldExecutor is null");
this.memoryUpdateExecutor = requireNonNull(memoryUpdateExecutor, "memoryUpdateExecutor is null");
this.localExecutionPlanner = requireNonNull(localExecutionPlanner, "localExecutionPlanner is null");
this.executionExceptionFactory = requireNonNull(executionExceptionFactory, "executionExceptionFactory is null");
this.taskExecutor = requireNonNull(taskExecutor, "taskExecutor is null");
Expand Down Expand Up @@ -436,7 +441,8 @@ public <T extends PrestoSparkTaskOutput> IPrestoSparkTaskExecutor<T> doCreate(
localExecutionPlan,
taskExecutor,
splitMonitor,
notificationExecutor);
notificationExecutor,
memoryUpdateExecutor);

taskExecution.start(taskSources);

Expand Down

0 comments on commit 6b97e58

Please sign in to comment.