Skip to content

[SPARK-9103][WIP] Add Memory Tracking UI and track Netty memory usage #17625

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 9 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ private static class ClientPool {
private EventLoopGroup workerGroup;
private PooledByteBufAllocator pooledAllocator;

public PooledByteBufAllocator getPooledAllocator() { return pooledAllocator; }

public TransportClientFactory(
TransportContext context,
List<TransportClientBootstrap> clientBootstraps) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ public class TransportServer implements Closeable {

private ServerBootstrap bootstrap;
private ChannelFuture channelFuture;
private PooledByteBufAllocator allocator;
private int port = -1;

/**
Expand All @@ -78,6 +79,8 @@ public TransportServer(
}
}

public PooledByteBufAllocator getAllocator() { return allocator; }

public int getPort() {
if (port == -1) {
throw new IllegalStateException("Server not initialized");
Expand All @@ -92,7 +95,7 @@ private void init(String hostToBind, int portToBind) {
NettyUtils.createEventLoop(ioMode, conf.serverThreads(), conf.getModuleName() + "-server");
EventLoopGroup workerGroup = bossGroup;

PooledByteBufAllocator allocator = NettyUtils.createPooledByteBufAllocator(
allocator = NettyUtils.createPooledByteBufAllocator(
conf.preferDirectBufs(), true /* allowCache */, conf.serverThreads());

bootstrap = new ServerBootstrap()
Expand Down
6 changes: 4 additions & 2 deletions core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.util.concurrent.{ScheduledFuture, TimeUnit}
import scala.collection.mutable
import scala.concurrent.Future

import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics}
import org.apache.spark.internal.Logging
import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint}
import org.apache.spark.scheduler._
Expand All @@ -36,6 +37,7 @@ import org.apache.spark.util._
*/
private[spark] case class Heartbeat(
executorId: String,
executorMetrics: ExecutorMetrics,
accumUpdates: Array[(Long, Seq[AccumulatorV2[_, _]])], // taskId -> accumulator updates
blockManagerId: BlockManagerId)

Expand Down Expand Up @@ -120,14 +122,14 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock)
context.reply(true)

// Messages received from executors
case heartbeat @ Heartbeat(executorId, accumUpdates, blockManagerId) =>
case heartbeat @ Heartbeat(executorId, executorMetrics, accumUpdates, blockManagerId) =>
if (scheduler != null) {
if (executorLastSeen.contains(executorId)) {
executorLastSeen(executorId) = clock.getTimeMillis()
eventLoopThread.submit(new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
val unknownExecutor = !scheduler.executorHeartbeatReceived(
executorId, accumUpdates, blockManagerId)
executorId, executorMetrics, accumUpdates, blockManagerId)
val response = HeartbeatResponse(reregisterBlockManager = unknownExecutor)
context.reply(response)
}
Expand Down
3 changes: 3 additions & 0 deletions core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.memory.{MemoryManager, StaticMemoryManager, UnifiedMemoryManager}
import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.network.BlockTransferService
import org.apache.spark.network.netty.NettyBlockTransferService
import org.apache.spark.rpc.{RpcEndpoint, RpcEndpointRef, RpcEnv}
import org.apache.spark.scheduler.{LiveListenerBus, OutputCommitCoordinator}
Expand Down Expand Up @@ -63,6 +64,7 @@ class SparkEnv (
val mapOutputTracker: MapOutputTracker,
val shuffleManager: ShuffleManager,
val broadcastManager: BroadcastManager,
val blockTransferService: BlockTransferService,
val blockManager: BlockManager,
val securityManager: SecurityManager,
val metricsSystem: MetricsSystem,
Expand Down Expand Up @@ -383,6 +385,7 @@ object SparkEnv extends Logging {
mapOutputTracker,
shuffleManager,
broadcastManager,
blockTransferService,
blockManager,
securityManager,
metricsSystem,
Expand Down
22 changes: 21 additions & 1 deletion core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,12 @@ private[spark] class Executor(
env.blockManager.initialize(conf.getAppId)
}

private val executorMetrics: ExecutorMetrics = new ExecutorMetrics
executorMetrics.setHostname(Utils.localHostName)
if (env.rpcEnv.address != null) {
executorMetrics.setPort(Some(env.rpcEnv.address.port))
}

// Whether to load classes in user jars before those in Spark jars
private val userClassPathFirst = conf.getBoolean("spark.executor.userClassPathFirst", false)

Expand Down Expand Up @@ -704,7 +710,21 @@ private[spark] class Executor(
}
}

val message = Heartbeat(executorId, accumUpdates.toArray, env.blockManager.blockManagerId)
env.blockTransferService.updateMemMetrics(this.executorMetrics)
val executorMetrics = if (isLocal) {
// When running locally, there is a chance that the executorMetrics could change
// out from under us. So, copy them here. In non-local mode this object would be
// serialized and de-serialized on its way to the driver. Perform that operation here
// to obtain the same result as non-local mode.
// TODO - Add a test that fails in local mode if we don't copy executorMetrics here.
Utils.deserialize[ExecutorMetrics](Utils.serialize(this.executorMetrics))
} else {
this.executorMetrics
}

val message = Heartbeat(
executorId, executorMetrics, accumUpdates.toArray, env.blockManager.blockManagerId)

try {
val response = heartbeatReceiverRef.askSync[HeartbeatResponse](
message, RpcTimeout(conf, "spark.executor.heartbeatInterval", "10s"))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.executor

import org.apache.spark.annotation.DeveloperApi

/**
* :: DeveloperApi ::
*/
@DeveloperApi
class ExecutorMetrics extends Serializable {

private var _hostname: String = ""
def hostname: String = _hostname
private[spark] def setHostname(value: String) = _hostname = value

private var _port: Option[Int] = None
def port: Option[Int] = _port
private[spark] def setPort(value: Option[Int]) = _port = value

private[spark] def hostPort: String = {
port match {
case None => hostname
case value => hostname + ":" + value.get
}
}

private var _transportMetrics: TransportMetrics =
new TransportMetrics(System.currentTimeMillis(), 0L, 0L)
def transportMetrics: TransportMetrics = _transportMetrics
private[spark] def setTransportMetrics(value: TransportMetrics) = {
_transportMetrics = value
}
}

object ExecutorMetrics extends Serializable {
def apply(
hostName: String,
port: Option[Int],
transportMetrics: TransportMetrics): ExecutorMetrics = {
val execMetrics = new ExecutorMetrics
execMetrics.setHostname(hostName)
execMetrics.setPort(port)
execMetrics.setTransportMetrics(transportMetrics)
execMetrics
}
}

/**
* :: DeveloperApi ::
*/
@DeveloperApi
case class TransportMetrics (
val timeStamp: Long,
val onHeapSize: Long,
val offHeapSize: Long) extends Serializable

Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import scala.concurrent.{Future, Promise}
import scala.concurrent.duration.Duration
import scala.reflect.ClassTag

import org.apache.spark.executor.ExecutorMetrics
import org.apache.spark.internal.Logging
import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer}
import org.apache.spark.network.shuffle.{BlockFetchingListener, ShuffleClient}
Expand All @@ -39,6 +40,11 @@ abstract class BlockTransferService extends ShuffleClient with Closeable with Lo
*/
def init(blockDataManager: BlockDataManager): Unit

/**
* Collect current executor memory metrics of transferService.
*/
private[spark] def updateMemMetrics(executorMetrics: ExecutorMetrics): Unit

/**
* Tear down the transfer service.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,16 @@
package org.apache.spark.network.netty

import java.nio.ByteBuffer
import java.util.{List => JList}

import scala.collection.JavaConverters._
import scala.concurrent.{Future, Promise}
import scala.reflect.ClassTag

import org.apache.spark.{SecurityManager, SparkConf}
import io.netty.buffer._

import org.apache.spark.{SecurityManager, SparkConf, SparkEnv}
import org.apache.spark.executor.{ExecutorMetrics, TransportMetrics}
import org.apache.spark.network._
import org.apache.spark.network.buffer.ManagedBuffer
import org.apache.spark.network.client.{RpcResponseCallback, TransportClientBootstrap, TransportClientFactory}
Expand All @@ -34,7 +38,7 @@ import org.apache.spark.network.shuffle.protocol.UploadBlock
import org.apache.spark.network.util.JavaUtils
import org.apache.spark.serializer.JavaSerializer
import org.apache.spark.storage.{BlockId, StorageLevel}
import org.apache.spark.util.Utils
import org.apache.spark.util.{Clock, SystemClock, Utils}

/**
* A BlockTransferService that uses Netty to fetch a set of blocks at time.
Expand All @@ -57,6 +61,38 @@ private[spark] class NettyBlockTransferService(
private[this] var server: TransportServer = _
private[this] var clientFactory: TransportClientFactory = _
private[this] var appId: String = _
private[this] var clock: Clock = new SystemClock()

/**
* Use a different clock for this allocation manager. This is mainly used for testing.
*/
def setClock(newClock: Clock): Unit = {
clock = newClock
}

private[spark] override def updateMemMetrics(executorMetrics: ExecutorMetrics): Unit = {
val currentTime = clock.getTimeMillis()
val clientPooledAllocator = clientFactory.getPooledAllocator()
val serverAllocator = server.getAllocator()
val clientOffHeapSize = sumOfMetrics(clientPooledAllocator.directArenas())
val clientOnHeapSize = sumOfMetrics(clientPooledAllocator.heapArenas())
val serverOffHeapSize = sumOfMetrics(serverAllocator.directArenas())
val serverOnHeapSize = sumOfMetrics(serverAllocator.heapArenas())
logDebug(s"Current Netty Client offheap size is $clientOffHeapSize, " +
s"Client heap size is $clientOnHeapSize, Server offheap size is $serverOffHeapSize, " +
s"server heap size is $serverOnHeapSize, executor id is " +
s"${SparkEnv.get.blockManager.blockManagerId.executorId}")
executorMetrics.setTransportMetrics(TransportMetrics(currentTime,
clientOnHeapSize + serverOnHeapSize, clientOffHeapSize + serverOffHeapSize))
}

private def sumOfMetrics(arenaMetricList: JList[PoolArenaMetric]): Long = {
arenaMetricList.asScala.map { Arena =>
Arena.chunkLists().asScala.map { chunk =>
chunk.iterator().asScala.map(_.chunkSize()).sum
}.sum
}.sum
}

override def init(blockDataManager: BlockDataManager): Unit = {
val rpcHandler = new NettyBlockRpcServer(conf.getAppId, serializer, blockDataManager)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import org.apache.commons.lang3.SerializationUtils

import org.apache.spark._
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics}
import org.apache.spark.internal.Logging
import org.apache.spark.network.util.JavaUtils
import org.apache.spark.partial.{ApproximateActionListener, ApproximateEvaluator, PartialResult}
Expand Down Expand Up @@ -229,16 +229,18 @@ class DAGScheduler(
}

/**
* Update metrics for in-progress tasks and let the master know that the BlockManager is still
* alive. Return true if the driver knows about the given block manager. Otherwise, return false,
* indicating that the block manager should re-register.
* Update metrics for live executor and in-progress tasks and let the master know that the
* BlockManager is still alive. Return true if the driver knows about the given block manager.
* Otherwise, return false, indicating that the block manager should re-register.
*/
def executorHeartbeatReceived(
execId: String,
executorMetrics: ExecutorMetrics,
// (taskId, stageId, stageAttemptId, accumUpdates)
accumUpdates: Array[(Long, Int, Int, Seq[AccumulableInfo])],
blockManagerId: BlockManagerId): Boolean = {
listenerBus.post(SparkListenerExecutorMetricsUpdate(execId, accumUpdates))
listenerBus.post(
SparkListenerExecutorMetricsUpdate(execId, accumUpdates, Some(executorMetrics)))
blockManagerMaster.driverEndpoint.askSync[Boolean](
BlockManagerHeartbeat(blockManagerId), new RpcTimeout(600 seconds, "BlockManagerHeartbeat"))
}
Expand Down
Loading